Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 120 additions & 2 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ This package provides two sub-modules:
- `@effectionx/node/stream` - Stream utilities for Node.js
- `@effectionx/node/events` - Event utilities for Node.js EventEmitters

You can also import everything from the main module:
You can also import everything — including the `Stdio` context API —
from the main module:

```typescript
import { fromReadable, on, once } from "@effectionx/node";
import { Stdio, fromReadable, on, once, stdin, stdout } from "@effectionx/node";
```

## Stream Utilities
Expand All @@ -49,6 +50,123 @@ await main(function* () {
The returned stream emits `Uint8Array` chunks and automatically cleans up
event listeners when the stream is closed or the operation is shut down.

## Host Stdio

`Stdio` is a middleware-capable context API for the **host** process's
standard input, output, and error streams. The default handlers read
from `process.stdin` and write to `process.stdout` / `process.stderr`,
so in normal production code you just call the operations and bytes
flow as you'd expect. Install middleware with `Stdio.around({ ... })`
inside any scope to observe, transform, or redirect those bytes — useful
for tests that assert what a program wrote to stdout, or harnesses that
feed synthesized input to code reading from stdin.

This is distinct from `@effectionx/process`'s `Stdio`, which governs
**child**-process stdio.

### Writing to stdout / stderr

`stdout` and `stderr` are `(bytes: Uint8Array) => Operation<void>`
operations destructured from `Stdio.operations` and re-exported at the
package root, so you can use them directly:

```typescript
import { main } from "effection";
import { stderr, stdout } from "@effectionx/node";

await main(function* () {
yield* stdout(new TextEncoder().encode("hello\n"));
yield* stderr(new TextEncoder().encode("oops\n"));
});
```

### Reading stdin

`stdin()` returns a `Stream<Uint8Array, void>` sourced from
`process.stdin`. A single `yield*` gives you a subscription you can
iterate, or use `each` for a `for`-loop:

```typescript
import { each, main } from "effection";
import { stdin, stdout } from "@effectionx/node";

await main(function* () {
// echo every chunk back to stdout
for (const chunk of yield* each(stdin())) {
yield* stdout(chunk);
yield* each.next();
}
});
```

### Intercepting with middleware

Middleware is registered per scope via `Stdio.around(...)`. Each member
is a function `(args, next) => TReturn` where delegation to the next
link (including the default handler) is `next(...args)`. Middleware
applies to the current scope and its descendants until they exit.

```typescript
import { main } from "effection";
import { Stdio, stdout } from "@effectionx/node";

await main(function* () {
const captured: Uint8Array[] = [];

yield* Stdio.around({
*stdout(args, next) {
captured.push(args[0]);
return yield* next(...args); // delegate so bytes still reach process.stdout
},
});

yield* stdout(new TextEncoder().encode("hello\n"));
// `captured` holds the bytes, and they also reached the terminal
});
```

To **redirect** without reaching the default handler, simply don't call
`next`:

```typescript
yield* Stdio.around({
*stdout(args, _next) {
captured.push(args[0]);
// no call to next → nothing is written to process.stdout
},
});
```

### Substituting stdin

Because `stdin()` returns a `Stream` directly, a `stdin` middleware is
a plain function that returns a replacement stream — useful for
feeding a synthetic input sequence from a test:

```typescript
import { createSignal, each, main } from "effection";
import { Stdio, stdin } from "@effectionx/node";

await main(function* () {
const signal = createSignal<Uint8Array, void>();

yield* Stdio.around({
stdin(_args, _next) {
return signal;
},
});

// From elsewhere in your test, drive the stream:
// signal.send(new TextEncoder().encode("line 1\n"));
// signal.close();

for (const chunk of yield* each(stdin())) {
// ...handle synthesized input
yield* each.next();
}
});
```

## Event Utilities

### on()
Expand Down
1 change: 1 addition & 0 deletions node/mod.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./stream.ts";
export * from "./events.ts";
export * from "./stdio.ts";
5 changes: 4 additions & 1 deletion node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@effectionx/node",
"description": "Node.js stream and event emitter adapters for Effection",
"version": "0.2.4",
"version": "0.3.0",
"keywords": ["io", "streams"],
"type": "module",
"main": "./dist/mod.js",
Expand All @@ -26,6 +26,9 @@
"default": "./dist/events.js"
}
},
"dependencies": {
"@effectionx/context-api": "workspace:*"
},
"peerDependencies": {
"effection": "^3 || ^4"
},
Expand Down
220 changes: 220 additions & 0 deletions node/stdio.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import process from "node:process";
import { PassThrough, Writable } from "node:stream";
import { describe, it } from "@effectionx/vitest";
import {
createSignal,
scoped,
type Stream,
type Subscription,
} from "effection";
import { expect } from "expect";

import { Stdio, stderr, stdin, stdout } from "./stdio.ts";

function captureWritable(): { stream: Writable; written: Uint8Array[] } {
const written: Uint8Array[] = [];
const stream = new Writable({
write(chunk: Uint8Array, _enc, cb) {
written.push(chunk);
cb();
},
});
return { stream, written };
}

function overrideStdio(
key: "stdin" | "stdout" | "stderr",
value: unknown,
): PropertyDescriptor {
const original = Object.getOwnPropertyDescriptor(process, key)!;
Object.defineProperty(process, key, { configurable: true, value });
return original;
}

function* drain(subscription: Subscription<Uint8Array, void>) {
const decoder = new TextDecoder();
const chunks: string[] = [];
let r = yield* subscription.next();
while (!r.done) {
chunks.push(decoder.decode(r.value));
r = yield* subscription.next();
}
return chunks;
}

describe("Stdio middleware", () => {
it("captures stdout bytes via middleware and delegates to process.stdout", function* () {
const { stream: fakeStdout, written } = captureWritable();
const original = overrideStdio("stdout", fakeStdout);

try {
const captured: Uint8Array[] = [];
yield* Stdio.around({
*stdout(args, next) {
captured.push(args[0]);
return yield* next(...args);
},
});

const bytes = new TextEncoder().encode("hello\n");
yield* stdout(bytes);

const decoder = new TextDecoder();
expect(captured.length).toBe(1);
expect(decoder.decode(captured[0])).toBe("hello\n");
expect(written.length).toBe(1);
expect(decoder.decode(written[0])).toBe("hello\n");
} finally {
Object.defineProperty(process, "stdout", original);
}
});

it("captures stderr bytes via middleware and delegates to process.stderr", function* () {
const { stream: fakeStderr, written } = captureWritable();
const original = overrideStdio("stderr", fakeStderr);

try {
const captured: Uint8Array[] = [];
yield* Stdio.around({
*stderr(args, next) {
captured.push(args[0]);
return yield* next(...args);
},
});

const bytes = new TextEncoder().encode("oops\n");
yield* stderr(bytes);

const decoder = new TextDecoder();
expect(captured.length).toBe(1);
expect(decoder.decode(captured[0])).toBe("oops\n");
expect(written.length).toBe(1);
expect(decoder.decode(written[0])).toBe("oops\n");
} finally {
Object.defineProperty(process, "stderr", original);
}
});

it("can substitute stdin with a synthetic stream and propagates completion", function* () {
const signal = createSignal<Uint8Array, void>();
const synthetic: Stream<Uint8Array, void> = signal;

yield* Stdio.around({
stdin(_args, _next) {
return synthetic;
},
});

const subscription = yield* stdin();

const encoder = new TextEncoder();
signal.send(encoder.encode("one"));
signal.send(encoder.encode("two"));
signal.close();

const decoder = new TextDecoder();

let result = yield* subscription.next();
if (result.done) {
throw new Error("expected first chunk before end-of-stream");
}
expect(decoder.decode(result.value)).toBe("one");

result = yield* subscription.next();
if (result.done) {
throw new Error("expected second chunk before end-of-stream");
}
expect(decoder.decode(result.value)).toBe("two");

result = yield* subscription.next();
expect(result.done).toBe(true);
});

it("middleware is scoped and does not leak", function* () {
const { stream: fakeStdout } = captureWritable();
const original = overrideStdio("stdout", fakeStdout);

try {
const outerCalls: string[] = [];
yield* Stdio.around({
*stdout(args, next) {
outerCalls.push("outer");
return yield* next(...args);
},
});

const bytes = new TextEncoder().encode("hi\n");
yield* stdout(bytes);
expect(outerCalls).toEqual(["outer"]);

yield* scoped(function* () {
const innerCalls: string[] = [];
yield* Stdio.around({
*stdout(args, next) {
innerCalls.push("inner");
return yield* next(...args);
},
});

yield* stdout(bytes);
expect(outerCalls).toEqual(["outer", "outer"]);
expect(innerCalls).toEqual(["inner"]);
});

outerCalls.length = 0;
yield* stdout(bytes);
expect(outerCalls).toEqual(["outer"]);
} finally {
Object.defineProperty(process, "stdout", original);
}
});
});

describe("Stdio defaults", () => {
it("reads bytes from process.stdin by default", function* () {
const fake = new PassThrough();
const original = overrideStdio("stdin", fake);

try {
const subscription = yield* stdin();

fake.write(Buffer.from("hello\n"));
fake.end();

const decoder = new TextDecoder();

let result = yield* subscription.next();
if (result.done) {
throw new Error("expected a chunk before end-of-stream");
}
expect(decoder.decode(result.value)).toBe("hello\n");

result = yield* subscription.next();
expect(result.done).toBe(true);
} finally {
Object.defineProperty(process, "stdin", original);
}
});

it("supports multiple concurrent stdin() consumers", function* () {
const fake = new PassThrough();
const original = overrideStdio("stdin", fake);

try {
const left = yield* stdin();
const right = yield* stdin();

fake.write(Buffer.from("one"));
fake.write(Buffer.from("two"));
fake.end();

const leftChunks = yield* drain(left);
const rightChunks = yield* drain(right);

expect(leftChunks).toEqual(["one", "two"]);
expect(rightChunks).toEqual(["one", "two"]);
} finally {
Object.defineProperty(process, "stdin", original);
}
});
});
Loading
Loading