Skip to content

Commit 0899b78

Browse files
authored
Merge pull request #7 from Innmind/non-blocking-pool
Add non blocking stream pool
2 parents ea710d3 + 34c4c81 commit 0899b78

File tree

4 files changed

+92
-0
lines changed

4 files changed

+92
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- `Innmind\IO\Streams\Stream\Read\Pool::nonBlocking()`
8+
59
### Fixed
610

711
- `Innmind\IO\Streams\Stream\Read\Pool` was losing the id type when calling `->watch()`, `->timeoutAfter()` or `->toEncoding()`

proofs/streams.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,59 @@ static function($assert, $a, $b, $encoding) {
306306
},
307307
);
308308

309+
yield proof(
310+
'IO::streams()->acquire()->read()->pool()->nonBlocking()->chunks()',
311+
given(
312+
$string,
313+
$string,
314+
Set::of(...Str\Encoding::cases()),
315+
),
316+
static function($assert, $a, $b, $encoding) {
317+
$tmpA = \tmpfile();
318+
\fwrite($tmpA, $a);
319+
$tmpB = \tmpfile();
320+
\fwrite($tmpB, $b);
321+
322+
$io = IO::fromAmbientAuthority();
323+
$chunks = $io
324+
->streams()
325+
->acquire($tmpA)
326+
->read()
327+
->pool('a')
328+
->with(
329+
'b',
330+
$io
331+
->streams()
332+
->acquire($tmpB)
333+
->read(),
334+
)
335+
->watch()
336+
->toEncoding($encoding)
337+
->nonBlocking()
338+
->chunks();
339+
340+
$assert->same(2, $chunks->size());
341+
$chunks->foreach(static fn($chunk) => $assert->same(
342+
$encoding,
343+
$chunk->value()->encoding(),
344+
));
345+
$assert->same(
346+
[$a],
347+
$chunks
348+
->filter(static fn($chunk) => $chunk->key() === 'a')
349+
->map(static fn($chunk) => $chunk->value()->toString())
350+
->toList(),
351+
);
352+
$assert->same(
353+
[$b],
354+
$chunks
355+
->filter(static fn($chunk) => $chunk->key() === 'b')
356+
->map(static fn($chunk) => $chunk->value()->toString())
357+
->toList(),
358+
);
359+
},
360+
);
361+
309362
yield proof(
310363
'IO::streams()->acquire()->close()',
311364
given($string),

src/Streams/Stream/Read.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public function internal(): Stream
7171
}
7272

7373
/**
74+
* This doesn't affect the blocking when pooling.
75+
*
76+
* To prevent having some streams being blocking and others not, the non
77+
* blocking have to be explicitly called on the pool.
78+
*
7479
* @psalm-mutation-free
7580
*/
7681
public function nonBlocking(): self

src/Streams/Stream/Read/Pool.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ private function __construct(
3030
private Map $streams,
3131
private ?Period $timeout,
3232
private ?Str\Encoding $encoding,
33+
private bool $blocking,
3334
) {
3435
}
3536

@@ -51,6 +52,7 @@ public static function of(
5152
Map::of([$stream->internal(), $id]),
5253
null,
5354
null,
55+
true,
5456
);
5557
}
5658

@@ -69,6 +71,7 @@ public function with(mixed $id, Read $stream): self
6971
($this->streams)($stream->internal(), $id),
7072
$this->timeout,
7173
$this->encoding,
74+
$this->blocking,
7275
);
7376
}
7477

@@ -90,6 +93,7 @@ public function watch(): self
9093
$this->streams,
9194
null,
9295
$this->encoding,
96+
$this->blocking,
9397
);
9498
}
9599

@@ -103,6 +107,7 @@ public function timeoutAfter(Period $timeout): self
103107
$this->streams,
104108
$timeout,
105109
$this->encoding,
110+
$this->blocking,
106111
);
107112
}
108113

@@ -116,6 +121,21 @@ public function toEncoding(Str\Encoding $encoding): self
116121
$this->streams,
117122
$this->timeout,
118123
$encoding,
124+
$this->blocking,
125+
);
126+
}
127+
128+
/**
129+
* @return self<T>
130+
*/
131+
public function nonBlocking(): self
132+
{
133+
return new self(
134+
$this->capabilities,
135+
$this->streams,
136+
$this->timeout,
137+
$this->encoding,
138+
false,
119139
);
120140
}
121141

@@ -124,6 +144,7 @@ public function toEncoding(Str\Encoding $encoding): self
124144
*/
125145
public function chunks(): Sequence
126146
{
147+
$blocking = $this->blocking;
127148
$watch = $this->capabilities->watch();
128149
$watch = match ($this->timeout) {
129150
null => $watch->waitForever(),
@@ -133,6 +154,15 @@ public function chunks(): Sequence
133154
->streams
134155
->keys()
135156
->filter(static fn($stream) => !$stream->closed())
157+
->flatMap(
158+
static fn($stream) => (match ($blocking) {
159+
true => $stream->blocking(),
160+
false => $stream->nonBlocking(),
161+
})
162+
->map(static fn() => $stream)
163+
->toSequence()
164+
->toSet(),
165+
)
136166
->reduce(
137167
$watch,
138168
static fn(Internal\Watch $watch, $stream) => $watch->forRead($stream),

0 commit comments

Comments
 (0)