Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 126 additions & 37 deletions packages/opencode/test/effect/runner.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,73 @@
import { describe, expect, test } from "bun:test"
import { Cause, Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
import { Cause, Clock, Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
import { Runner } from "../../src/effect"
import type { Runner as RunnerInstance, State } from "../../src/effect/runner"
import { it } from "../lib/effect"

interface DeferredRuntimeWaiters {
readonly resumes?: ReadonlyArray<unknown>
}

function waitUntil(message: () => string, ready: () => boolean) {
return Effect.gen(function* () {
const started = yield* Clock.currentTimeMillis
while (!ready()) {
const now = yield* Clock.currentTimeMillis
if (now - started > 1_000) {
return yield* Effect.die(new Error(message()))
}
yield* Effect.sleep("1 millis")
}
})
}

function waitForRunnerState<A, E>(runner: RunnerInstance<A, E>, tag: State<A, E>["_tag"]) {
return waitUntil(
() => `Runner did not enter ${tag}; current state is ${runner.state._tag}`,
() => runner.state._tag === tag,
)
}

// Effect Deferred does not expose waiter counts publicly. Keep this test-only
// runtime-shape check private to the shared-run tests that need to prove a
// second caller is waiting on the existing run before cancel/release.
function deferredWaiterCount<A, E>(deferred: Deferred.Deferred<A, E>) {
return (deferred as DeferredRuntimeWaiters).resumes?.length ?? 0
}

function waitForSharedRunWaiters<A, E>(deferred: Deferred.Deferred<A, E>, count: number, label: string) {
return waitUntil(
() => `${label} did not attach ${count} waiter(s); current waiters=${deferredWaiterCount(deferred)}`,
() => deferredWaiterCount(deferred) >= count,
)
}

function currentRunDone<A, E>(runner: RunnerInstance<A, E>) {
return Effect.gen(function* () {
const state = runner.state
if (state._tag === "Running" || state._tag === "ShellThenRun") return state.run.done
return yield* Effect.die(new Error(`Runner has no current run; current state is ${state._tag}`))
})
}

function makeBlockedWork<A>(value: A, label = "running work") {
return Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const release = yield* Deferred.make<void>()
return {
waitUntilStarted: Deferred.await(started).pipe(
Effect.timeoutOrElse({
duration: "1 second",
orElse: () => Effect.die(new Error(`${label} did not start`)),
}),
),
work: Effect.uninterruptibleMask((restore) =>
Deferred.succeed(started, undefined).pipe(Effect.andThen(restore(Deferred.await(release))), Effect.as(value)),
),
}
})
}

describe("Runner", () => {
// --- ensureRunning semantics ---

Expand Down Expand Up @@ -115,8 +180,10 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("never")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted
expect(runner.busy).toBe(true)
expect(runner.state._tag).toBe("Running")

Expand All @@ -143,8 +210,10 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: () => Effect.succeed("fallback") })
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("never")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted

yield* runner.cancel

Expand All @@ -161,8 +230,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, {
onInterrupt: (meta) => Effect.succeed(`${meta?.source}:${meta?.reason}:${typeof meta?.recordedAt}`),
})
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("never")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted

yield* runner.cancel

Expand All @@ -179,8 +250,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, {
onInterrupt: (meta) => Effect.succeed(`${meta?.source}:${meta?.reason}:${typeof meta?.recordedAt}`),
})
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("never")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted

yield* Scope.close(s, Exit.void)

Expand All @@ -197,11 +270,14 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: () => Effect.succeed("fallback") })
const blocked = yield* makeBlockedWork("x")

const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const a = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted
const done = yield* currentRunDone(runner)
const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForSharedRunWaiters(done, 2, "running run")

yield* runner.cancel

Expand All @@ -221,16 +297,17 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, {
onInterrupt: (meta) => Effect.succeed(meta?.reason ?? "missing"),
})
const work = Effect.never.pipe(
const blocked = yield* makeBlockedWork("never")
const work = blocked.work.pipe(
Effect.onInterrupt(() => Deferred.await(release)),
Effect.as("never"),
)

const fiber = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted

yield* runner.cancelWith({ source: "first", reason: "first" }).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Idle")
yield* runner.cancelWith({ source: "second", reason: "second" })
yield* Deferred.succeed(release, undefined)

Expand All @@ -245,8 +322,10 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("x")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted
yield* runner.cancel
yield* Fiber.await(fiber)

Expand Down Expand Up @@ -326,8 +405,10 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("x")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted

const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
Expand All @@ -345,7 +426,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")

const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
Expand All @@ -366,7 +447,7 @@ describe("Runner", () => {
})

const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")

const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
Expand All @@ -385,7 +466,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")

const stop = yield* runner.cancel.pipe(Effect.forkChild)
const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
Expand All @@ -404,11 +485,13 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: () => Effect.succeed("interrupted") })
const blocked = yield* makeBlockedWork("ignored", "defective shell")

const sh = yield* runner
.startShell(Effect.never.pipe(Effect.ensuring(Effect.die("boom")), Effect.as("ignored")))
.startShell(blocked.work.pipe(Effect.ensuring(Effect.die("boom"))))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")
yield* blocked.waitUntilStarted

yield* runner.cancel
expect(Exit.isFailure(yield* Fiber.await(sh))).toBe(true)
Expand All @@ -420,11 +503,13 @@ describe("Runner", () => {
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string, string>(s, { onInterrupt: () => Effect.succeed("interrupted") })
const blocked = yield* makeBlockedWork("ignored", "typed failing shell")

const sh = yield* runner
.startShell(Effect.never.pipe(Effect.onInterrupt(() => Effect.fail("boom")), Effect.as("ignored")))
.startShell(blocked.work.pipe(Effect.onInterrupt(() => Effect.fail("boom"))))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")
yield* blocked.waitUntilStarted

yield* runner.cancel
const exit = yield* Fiber.await(sh)
Expand All @@ -445,11 +530,11 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")
expect(runner.state._tag).toBe("Shell")

const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "ShellThenRun")
expect(runner.state._tag).toBe("ShellThenRun")

yield* Deferred.succeed(gate, undefined)
Expand All @@ -471,15 +556,17 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")

const work = Effect.gen(function* () {
yield* Ref.update(calls, (n) => n + 1)
return "run"
})
const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "ShellThenRun")
const done = yield* currentRunDone(runner)
const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForSharedRunWaiters(done, 2, "queued shell run")

yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
Expand All @@ -498,10 +585,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)

const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")

const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "ShellThenRun")
expect(runner.state._tag).toBe("ShellThenRun")

yield* runner.cancel
Expand Down Expand Up @@ -536,8 +623,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, {
onIdle: Ref.update(count, (n) => n + 1),
})
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const blocked = yield* makeBlockedWork("x")
const fiber = yield* runner.ensureRunning(blocked.work).pipe(Effect.forkChild)
yield* waitForRunnerState(runner, "Running")
yield* blocked.waitUntilStarted
yield* runner.cancel
yield* Fiber.await(fiber)
expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
Expand Down Expand Up @@ -567,7 +656,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Running")
expect(runner.busy).toBe(true)

yield* Deferred.succeed(gate, undefined)
Expand All @@ -584,7 +673,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()

const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForRunnerState(runner, "Shell")
expect(runner.busy).toBe(true)

yield* Deferred.succeed(gate, undefined)
Expand Down
Loading