-
Notifications
You must be signed in to change notification settings - Fork 168
Description
Right now, piping to a writable stream with { highWaterMark: 0 } stalls indefinitely:
const rs = new ReadableStream({
start(c) {
c.enqueue("a");
c.enqueue("b");
c.enqueue("c");
c.close();
}
});
const ws = new WritableStream({
write(chunk) {
console.log("wrote:", chunk);
}
}, { highWaterMark: 0 });
rs.pipeTo(ws); // never resolves, and no messages are loggedThis makes it impossible to pipe through a TransformStream without increasing the total queue size of a pipe chain by at least one chunk:
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
}, { highWaterMark: 0 }, { highWaterMark: 0 });
rs.pipeThrough(upperCaseTransform).pipeTo(ws); // stalls indefinitely
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
}, { highWaterMark: 1 }, { highWaterMark: 0 }); // same as default strategies
rs.pipeThrough(upperCaseTransform); // works, but already pulls the first chunk from `rs`This is unfortunate, since there are many use cases for synchronous TransformStreams that shouldn't need buffering (i.e. every call to transform() immediately results in at least one enqueue()):
- A generic
mapTransform(fn), similar toarray.map(fn):function mapTransform(fn) { return new TransformStream({ transform(chunk, controller) { controller.enqueue(fn(chunk)); } }); } rs.pipeThrough(mapTransform(x => x.toUpperCase()));
TextEncoderStreamandTextDecoderStreamfrom Encoding.
Prior discussions on this topic noted that this is not possible. writer.desiredSize is always <= 0, so writer.ready is always pending:
- Change default readableStrategy HWM to 0? #777:
We can't reduce the HWM of the writableStrategy to 0 because it would have permanent backpressure preventing the pipe from working.
- #1083:
Yes. As you observed, a writable stream with a HWM of 0 will always have backpressure. So adding an identity TransformStream to a pipe can't be a complete no-op: it always increases the total queue size by 1.
But that got me thinking. A ReadableStream's source can be pull()ed as a result of reader.read(), even if controller.desiredSize <= 0. Maybe a WritableStream's sink should then also be able to release backpressure even if writer.desiredSize <= 0? 🤔
We could add a method on WritableStreamDefaultController (controller.pull()? controller.releaseBackpressure()? controller.notifyReady()?) that would have the result of immediately resolving the current writer.ready promise. Internally, we would do something like WritableStreamUpdateBackpressure(stream, false). My hope is that we can then use this inside TransformStreamSetBackpressure(), so that pulling from the readable end of a transform stream would also resolve ready on the writable end.
...Or am I missing something very obvious? 😛