Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c49d220
feat: add @effectionx/durable-streams package
taras Mar 6, 2026
e12d309
style: fix lint and formatting for durable-streams
taras Mar 6, 2026
3c40fe9
fix(durable-streams): accept void-returning workflows
taras Mar 6, 2026
6f58ec5
docs(durable-streams): add durable dinner demo instructions
taras Mar 6, 2026
05478e9
fix(durable-streams): sync tsconfig project references
taras Mar 6, 2026
a41eb3d
refactor(durable-streams): apply CodeRabbit review feedback
taras Mar 6, 2026
5ec3fcb
refactor(durable-streams): convert demo scripts to idiomatic Effection
taras Mar 6, 2026
ef83f0f
refactor(durable-streams): merge server + tailer into single observer…
taras Mar 6, 2026
f7b92aa
refactor(durable-streams): rearrange tmux layout — cook full-height left
taras Mar 6, 2026
ee841c6
fix(durable-streams): fix tailer 404 retry and swap demo layout
taras Mar 6, 2026
7602167
fix(durable-streams): move control pane under cook (bottom-right)
taras Mar 6, 2026
2d63b2b
fix(durable-streams): kill only node process, not the pane shell
taras Mar 6, 2026
08c4504
fix(durable-streams): use pkill -f to kill cook process by name
taras Mar 6, 2026
35794cf
style(durable-streams): fix tsconfig.json formatting
taras Mar 6, 2026
95eee36
refactor(durable-streams): add DurableRuntime interface, remove file-…
taras Mar 7, 2026
7ef26df
style(durable-streams): fix import sorting across test files
taras Mar 7, 2026
0afe467
docs(durable-streams): update README for runtime abstraction and guar…
taras Mar 7, 2026
6a60d2b
fix: address CodeRabbit review feedback for durable-streams
taras Mar 7, 2026
abfe832
docs(durable-streams): add design specification documents
taras Mar 7, 2026
2bdc4f3
feat(durable-streams): add stat method and StatResult to DurableRunti…
taras Mar 7, 2026
e7e4eb7
chore: regenerate preview package
taras Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
559 changes: 559 additions & 0 deletions durable-streams/README.md

Large diffs are not rendered by default.

280 changes: 280 additions & 0 deletions durable-streams/combinators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/**
* Structured concurrency combinators for durable workflows.
*
* durableSpawn, durableAll, durableRace — each wraps child workflows
* with DurableContext (coroutine IDs, Close events) so that structured
* concurrency is fully journaled and replayable.
*
* Each combinator returns Workflow<T> (not Operation<T>) so it can be
* used directly inside a Workflow via yield*. Internally, the infrastructure
* effects (useScope, spawn, all, race) are wrapped with ephemeral() —
* these are durable-safe operations that set up scope/context and don't
* need journaling. See DEC-034.
*
* Child workflows must be Workflow<T> — bare Operations are rejected at
* compile time. Use ephemeral() to explicitly opt in to non-durable
* children.
*
* See protocol spec §7 (structured concurrency), §10 (race semantics).
*/

import {
all as effectionAll,
race as effectionRace,
spawn,
suspend,
useScope,
} from "effection";
import type { Operation, Task } from "effection";
import { type DurableContext, DurableCtx } from "./context.ts";
import { ephemeral } from "./ephemeral.ts";
import { deserializeError, serializeError } from "./serialize.ts";
import type { Close, Json, Workflow, WorkflowValue } from "./types.ts";

// ---------------------------------------------------------------------------
// Internal: wrap a child workflow with DurableContext + Close emission
// ---------------------------------------------------------------------------

/**
* Run a child workflow within a spawned scope, setting up its own
* DurableContext and emitting a Close event when it terminates.
*
* This is the core building block for all structured concurrency combinators.
*
* It:
* 1. Checks if the child already completed (has Close event) — short-circuits
* 2. Sets DurableCtx on the child's scope with the child's coroutineId
* 3. Runs the child workflow (its DurableEffects use the child's coroutineId)
* 4. Appends Close(ok|err) when the child terminates
*
* IMPORTANT: This must be called inside a spawn() so it gets its own scope.
* The caller is responsible for spawn().
*/
function* runDurableChild<T extends WorkflowValue>(
childWorkflow: () => Workflow<T>,
childId: string,
parentCtx: DurableContext,
): Operation<T> {
const { replayIndex, stream } = parentCtx;

// Short-circuit: child already completed in a previous run.
// NOTE: Replay guard validation is not bypassed here — the check phase
// (runCheckPhase in durableRun) already iterated ALL Yield events
// (including this child's) before any workflow code runs. Guards have
// already had a chance to veto stale data. This fast-path only skips
// re-running the child's generator, not the guard validation.
if (replayIndex.hasClose(childId)) {
const closeEvent = replayIndex.getClose(childId)!;
if (closeEvent.result.status === "ok") {
return closeEvent.result.value as T;
} else if (closeEvent.result.status === "err") {
throw deserializeError(closeEvent.result.error);
} else {
// cancelled — this child was cancelled in a previous run (e.g.,
// a race loser). Instead of throwing, we suspend forever. The
// parent combinator (race/all) will cancel this child as part of
// normal structured concurrency teardown, just like the original
// run. The Close(cancelled) event already exists in the journal,
// so we skip re-emitting it (the finally block checks for this).
//
// INVARIANT: This branch is only reachable when a parent combinator
// (durableRace or durableAll with a failed sibling) will cancel this
// child. Close(cancelled) in the journal means the child was
// previously cancelled by structured concurrency, so on replay the
// same combinator will cancel it again. This cannot deadlock.
yield* suspend();
// unreachable — suspend blocks until cancelled
return undefined as T;
}
}

// Set child's DurableContext on this scope
const scope = yield* useScope();
scope.set(DurableCtx, {
replayIndex,
stream,
coroutineId: childId,
childCounter: 0,
});

// Track whether we completed normally or via error, so that
// the finally block can detect cancellation (the remaining case).
let closeEvent: Close | undefined;

try {
// Run the child workflow. DurableEffects inside the child read
// DurableCtx from the scope, so they'll use childId.
const result: T = yield* childWorkflow();

// Record Close(ok) — will be appended in finally
closeEvent = {
type: "close",
coroutineId: childId,
result: { status: "ok", value: result as Json },
};

return result;
} catch (error) {
// Record Close(err) — will be appended in finally
closeEvent = {
type: "close",
coroutineId: childId,
result: {
status: "err",
error: serializeError(
error instanceof Error ? error : new Error(String(error)),
),
},
};

throw error;
} finally {
// If closeEvent is still undefined, the generator was cancelled
// (Effection called iterator.return(), skipping both the normal
// return path and the catch block).
if (!closeEvent) {
closeEvent = {
type: "close",
coroutineId: childId,
result: { status: "cancelled" },
};
}

// Don't re-emit a Close event if one already exists in the journal
// (e.g., a cancelled child being replayed via suspend()).
if (!replayIndex.hasClose(childId)) {
// Append the Close event.
yield* stream.append(closeEvent!);
}
}
}

// ---------------------------------------------------------------------------
// durableSpawn — spawn a single durable child, returns Task<T>
// ---------------------------------------------------------------------------

/**
* Spawn a durable child workflow.
*
* Assigns a deterministic coroutine ID (parentId.N), sets up DurableContext
* on the child scope, and ensures Close events are emitted.
*
* Returns a Task<T> that can be yield*-ed to get the child's result.
*
* Returns Workflow<Task<T>> via ephemeral() — the infrastructure effects
* (useScope, spawn) are durable-safe scope setup that doesn't need
* journaling and re-runs correctly on replay.
*/
export function durableSpawn<T extends WorkflowValue>(
childWorkflow: () => Workflow<T>,
): Workflow<Task<T>> {
return ephemeral(
(function* (): Operation<Task<T>> {
const scope = yield* useScope();
const ctx = scope.expect<DurableContext>(DurableCtx);

// Assign deterministic child ID
const childIndex = ctx.childCounter++;
const childId = `${ctx.coroutineId}.${childIndex}`;

// Spawn the child with durable wrapping
return yield* spawn(() => runDurableChild(childWorkflow, childId, ctx));
})(),
);
}

// ---------------------------------------------------------------------------
// durableAll — fork/join, wait for all children
// ---------------------------------------------------------------------------

/**
* Run multiple durable workflows concurrently and wait for all to complete.
*
* Each child gets a deterministic coroutine ID (parentId.0, parentId.1, ...).
* Each child's effects are journaled under its own coroutineId.
* Each child emits a Close event on termination.
*
* If any child fails, remaining children are cancelled (fail-fast,
* Effection's default structured concurrency behavior via all()).
*
* Returns an array of results in the same order as the input workflows.
*
* See spec §7, §11.5.
*/
export function durableAll<T extends WorkflowValue>(
workflows: (() => Workflow<T>)[],
): Workflow<T[]> {
return ephemeral(
(function* (): Operation<T[]> {
const scope = yield* useScope();
const ctx = scope.expect<DurableContext>(DurableCtx);

// Build child Operations, one per workflow. Each gets its own
// deterministic coroutineId and Close event handling.
const childOps: Operation<T>[] = workflows.map((workflow) => {
const childIndex = ctx.childCounter++;
const childId = `${ctx.coroutineId}.${childIndex}`;

return {
*[Symbol.iterator]() {
return yield* runDurableChild(workflow, childId, ctx);
},
};
});

// Delegate to Effection's native all() which uses trap() internally
// for proper error isolation. This means:
// - Child errors are catchable by the caller via try/catch
// - When any child fails, remaining siblings are cancelled
// - The error propagates with the original message intact
return yield* effectionAll(childOps);
})(),
);
}

// ---------------------------------------------------------------------------
// durableRace — first child to complete wins, others cancelled
// ---------------------------------------------------------------------------

/**
* Race multiple durable workflows. The first to complete wins;
* remaining children are cancelled.
*
* Each child gets a deterministic coroutine ID. When the winner
* completes, Effection cancels the remaining children via
* iterator.return(). The runDurableChild wrapper detects this
* (closeEvent is undefined in the finally block) and emits
* Close(cancelled) for each loser.
*
* On replay, children with Close(cancelled) in the journal suspend
* indefinitely (yield* suspend()), letting the parent race cancel
* them naturally — matching the original live behavior.
*
* See spec §10.
*/
export function durableRace<T extends WorkflowValue>(
workflows: (() => Workflow<T>)[],
): Workflow<T> {
return ephemeral(
(function* (): Operation<T> {
const scope = yield* useScope();
const ctx = scope.expect<DurableContext>(DurableCtx);

// Build Operations for each child — each gets its own coroutineId
// and Close event handling via runDurableChild.
const childOps: Operation<T>[] = workflows.map((workflow) => {
const childIndex = ctx.childCounter++;
const childId = `${ctx.coroutineId}.${childIndex}`;

return {
*[Symbol.iterator]() {
return yield* runDurableChild(workflow, childId, ctx);
},
};
});

// Use Effection's native race() which handles cancellation properly
return yield* effectionRace(childOps);
})(),
);
}
30 changes: 30 additions & 0 deletions durable-streams/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* DurableContext — the scope-local state for durable execution.
*
* Stored on each Effection scope via createContext(). Child scopes
* inherit the shared replayIndex and stream, but get their own
* coroutineId and childCounter.
*/

import { type Context, createContext } from "effection";
import type { ReplayIndex } from "./replay-index.ts";
import type { DurableStream } from "./stream.ts";
import type { CoroutineId } from "./types.ts";

export interface DurableContext {
/** Shared replay index (built from stream on startup). */
replayIndex: ReplayIndex;
/** Shared durable stream for appending events. */
stream: DurableStream;
/** This coroutine's hierarchical ID. */
coroutineId: CoroutineId;
/** Counter for assigning child IDs. */
childCounter: number;
}

/**
* Effection Context for durable execution state.
* Set on the root scope by durableRun(); inherited by child scopes.
*/
export const DurableCtx: Context<DurableContext> =
createContext<DurableContext>("@effection/durable");
80 changes: 80 additions & 0 deletions durable-streams/demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Durable Dinner Demo

A live demo of durable execution built on Effection. It runs a cooking workflow,
lets you hard-kill the process, then restart and watch it replay from the
journal without duplicating completed work.

## Prerequisites

- Node.js 22+
- `tmux` 3.x+ (`brew install tmux`)

## Quick Start (tmux launcher)

From `durable-streams/`:

```sh
./demo/start.sh
```

Or with a custom stream ID:

```sh
./demo/start.sh my-stream
```

This opens a `tmux` session named `durable-dinner` with 3 panes:

```text
┌─────────────────────┬──────────────────┐
│ │ Cook (focused) │
│ Observer ├──────────────────┤
│ (server + journal) │ Control │
│ │ (kill cmd) │
└─────────────────────┴──────────────────┘
```

- **Observer** (`demo/observe.ts`) — starts the server and tails the journal
via SSE, printing color-coded events as they arrive
- **Cook** (`demo/cook.ts`) — the durable cooking workflow (focused, press Enter)
- **Control** — pre-typed kill command to simulate a crash

## Demo Script

1. Start with `./demo/start.sh`
2. In the cook pane, press Enter to run the workflow
3. Watch color-coded journal events stream in the observer pane
4. In the control pane, press Enter to hard-kill the cook process
5. Back in the cook pane, rerun:

```sh
node --experimental-strip-types demo/cook.ts
```

You should see:

- `Found N events in journal — replaying...`
- No new observer events during replay
- New events only after replay catches up and live execution resumes

## Run Without tmux

Open 2 terminals from `durable-streams/`:

Terminal 1 (observer — server + tailer):

```sh
DURABLE_STREAM_ID=my-stream node --experimental-strip-types demo/observe.ts
```

Terminal 2 (workflow):

```sh
DURABLE_STREAM_ID=my-stream node --experimental-strip-types demo/cook.ts
```

Then kill and restart Terminal 2 to observe replay behavior.

## Environment Variables

- `DURABLE_STREAM_ID` (default: `dinner-demo`)
Loading
Loading