Skip to content

Commit 3329c49

Browse files
committed
Update docs
1 parent 88a0b36 commit 3329c49

1 file changed

Lines changed: 49 additions & 47 deletions

File tree

  • src/routes/docs/architecture/stream-cancellation

src/routes/docs/architecture/stream-cancellation/+page.md

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,33 @@ OpenWorkers detects client disconnection at multiple levels:
1818

1919
```
2020
┌─────────────────────────────────────────────────────────────────┐
21-
│ Client Disconnects
21+
│ Client Disconnects │
2222
└─────────────────────────────────────────────────────────────────┘
2323
2424
2525
┌─────────────────────────────────────────────────────────────────┐
26-
Actix-web / HTTP Layer │
27-
28-
│ • Detects TCP connection closed
29-
│ • Drops the response stream receiver
26+
Hyper / HTTP Layer
27+
│ │
28+
│ • Detects TCP connection closed │
29+
│ • Drops the response stream (StreamBody::drop)
3030
└─────────────────────────────────────────────────────────────────┘
3131
3232
3333
┌─────────────────────────────────────────────────────────────────┐
34-
│ StreamManager (Rust)
35-
36-
│ • Channel sender is dropped
37-
│ • has_sender(stream_id) returns false
38-
│ • try_write_chunk() returns error
34+
│ StreamManager (Rust) │
35+
│ │
36+
│ • Channel sender is dropped │
37+
│ • has_sender(stream_id) returns false │
38+
│ • try_write_chunk() returns error │
3939
└─────────────────────────────────────────────────────────────────┘
4040
4141
4242
┌─────────────────────────────────────────────────────────────────┐
43-
│ JavaScript Runtime
44-
45-
│ • controller.signal.aborted = true
46-
│ • enqueue() throws TypeError
47-
│ • __responseStreamIsClosed() returns true
43+
│ JavaScript Runtime │
44+
│ │
45+
│ • controller.signal.aborted = true │
46+
│ • enqueue() throws TypeError │
47+
│ • __responseStreamIsClosed() returns true │
4848
└─────────────────────────────────────────────────────────────────┘
4949
```
5050

@@ -64,8 +64,8 @@ export default {
6464
break;
6565
}
6666

67-
controller.enqueue(`data: event ${i}\n\n`);
68-
await new Promise(r => setTimeout(r, 100));
67+
controller.enqueue(`data: event ${i}\\n\\n`);
68+
await new Promise((r) => setTimeout(r, 100));
6969
}
7070

7171
// Always close the stream (even if aborted)
@@ -91,8 +91,8 @@ export default {
9191
async start(controller) {
9292
try {
9393
for (let i = 0; i < 100; i++) {
94-
controller.enqueue(`data: event ${i}\n\n`);
95-
await new Promise(r => setTimeout(r, 100));
94+
controller.enqueue(`data: event ${i}\\n\\n`);
95+
await new Promise((r) => setTimeout(r, 100));
9696
}
9797
controller.close();
9898
} catch (error) {
@@ -113,12 +113,12 @@ export default {
113113

114114
## Behavior by Response Type
115115

116-
| Response Type | Cancellation Detection | Resource Waste |
117-
|--------------|------------------------|----------------|
118-
| `new Response("json")` | N/A (one-shot) | None |
119-
| `ReadableStream` (user code) | `signal.aborted` or `enqueue()` throws | Minimal |
120-
| `fetch()` forward | Automatic via `__streamResponseBody` | None |
121-
| Processed fetch | Same as ReadableStream | Minimal |
116+
| Response Type | Cancellation Detection | Resource Waste |
117+
| ---------------------------- | -------------------------------------- | -------------- |
118+
| `new Response("json")` | N/A (one-shot) | None |
119+
| `ReadableStream` (user code) | `signal.aborted` or `enqueue()` throws | Minimal |
120+
| `fetch()` forward | Automatic via `__streamResponseBody` | None |
121+
| Processed fetch | Same as ReadableStream | Minimal |
122122

123123
### Simple Response (No Streaming)
124124

@@ -147,7 +147,7 @@ The runtime automatically stops reading from upstream when the client disconnect
147147
const stream = new ReadableStream({
148148
async start(controller) {
149149
while (hasMoreData()) {
150-
if (controller.signal.aborted) break; // Recommended
150+
if (controller.signal.aborted) break; // Recommended
151151
controller.enqueue(getNextChunk());
152152
}
153153
}
@@ -163,16 +163,16 @@ const stream = new ReadableStream({
163163
The streaming response uses two channels:
164164

165165
1. **StreamManager channel** - JS writes chunks via `__responseStreamWrite`, a spawned task reads them
166-
2. **ResponseBody channel** - The spawned task forwards chunks to actix for HTTP delivery
166+
2. **ResponseBody channel** - The spawned task forwards chunks to Hyper for HTTP delivery
167167

168168
```
169169
┌──────────────┐ StreamManager ┌──────────────┐ ResponseBody ┌──────────┐
170-
│ JS Worker │ ──── channel ─────▶ │ Spawned Task │ ──── channel ────▶ │ Actix
170+
│ JS Worker │ ──── channel ─────▶ │ Spawned Task │ ──── channel ────▶ │ Hyper
171171
│ enqueue() │ │ (select!) │ │ HTTP │
172172
└──────────────┘ └──────────────┘ └──────────┘
173173
```
174174

175-
When actix detects client disconnect (TCP write fails), it drops the ResponseBody receiver. The spawned task detects this via `tx.closed()` and calls `stream_manager.close_stream()`.
175+
When Hyper detects client disconnect (TCP write fails), `StreamBody` is dropped. The `Drop` implementation sends a notification via a oneshot channel, which triggers `stream_manager.close_stream()`.
176176

177177
### exec() Loop Detection
178178

@@ -212,29 +212,29 @@ enqueue(chunk) {
212212
}
213213
```
214214
215-
### __streamResponseBody Detection
215+
### \_\_streamResponseBody Detection
216216
217217
For fetch forward and processed responses:
218218
219219
```javascript
220220
// In worker.rs __streamResponseBody
221221
while (true) {
222-
const { value, done } = await reader.read();
223-
if (done) break;
222+
const { value, done } = await reader.read();
223+
if (done) break;
224224
225-
while (!__responseStreamWrite(streamId, chunk)) {
226-
if (__responseStreamIsClosed(streamId)) {
227-
cancelled = true;
228-
break; // Stop forwarding
229-
}
230-
await new Promise(r => setTimeout(r, 1));
225+
while (!__responseStreamWrite(streamId, chunk)) {
226+
if (__responseStreamIsClosed(streamId)) {
227+
cancelled = true;
228+
break; // Stop forwarding
231229
}
230+
await new Promise((r) => setTimeout(r, 1));
231+
}
232232
233-
if (cancelled) break;
233+
if (cancelled) break;
234234
}
235235
236236
if (cancelled) {
237-
await reader.cancel('Client disconnected');
237+
await reader.cancel('Client disconnected');
238238
}
239239
```
240240
@@ -325,7 +325,7 @@ async start(controller) {
325325
There is an inherent delay (typically 1-5 seconds) between when a client disconnects and when the worker detects it. This is due to:
326326
327327
1. **TCP buffering** - Data is buffered at the OS level before being sent
328-
2. **HTTP chunked encoding** - Actix buffers chunks before writing to the socket
328+
2. **HTTP chunked encoding** - Hyper buffers chunks before writing to the socket
329329
3. **Write-based detection** - Disconnect is only detected when a write fails
330330
331331
This is a fundamental limitation of HTTP streaming over TCP, not specific to OpenWorkers. The worker will eventually detect the disconnect and can then clean up.
@@ -355,9 +355,11 @@ timeout 2 curl -N 'https://your-worker.workers.dev/stream'
355355

356356
## Key Files
357357

358-
| File | Purpose |
359-
|------|---------|
360-
| `openworkers-runtime-v8/src/worker.rs` | exec() loop cancellation detection |
361-
| `openworkers-runtime-v8/src/runtime/streams.rs` | ReadableStream with signal support |
362-
| `openworkers-runtime-v8/src/runtime/stream_manager.rs` | Rust-side channel management |
363-
| `openworkers-runtime-v8/src/runtime/bindings/streams.rs` | `__responseStreamIsClosed` binding |
358+
| File | Purpose |
359+
| -------------------------------------------------------- | -------------------------------------------------------------- |
360+
| `openworkers-core/src/http.rs` | `StreamBody` with `Drop` notification for disconnect detection |
361+
| `openworkers-runner/bin/main.rs` | Hyper HTTP server, passes disconnect channel to response |
362+
| `openworkers-runtime-v8/src/worker.rs` | exec() loop cancellation detection |
363+
| `openworkers-runtime-v8/src/runtime/streams.rs` | ReadableStream with signal support |
364+
| `openworkers-runtime-v8/src/runtime/stream_manager.rs` | Rust-side channel management |
365+
| `openworkers-runtime-v8/src/runtime/bindings/streams.rs` | `__responseStreamIsClosed` binding |

0 commit comments

Comments
 (0)