diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b845f0..8f2c3cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to the `pi-intercom` extension will be documented in this fi ## [Unreleased] +### Fixed +- Forked/replaced sessions now register with the broker. The runtime was only initialized on `session_start`; when a session's id changed in-process without a fresh `session_start` (forking/branching a session, or a resume path that adopts an id), it never (re-)registered and stayed unreachable until a full process restart. Re-initialize on the first turn whose live session id diverges from the registered identity (or when no `session_start` ever fired). + ## [0.6.0] - 2026-05-03 ### Added diff --git a/index.ts b/index.ts index 81340c0..f9e6d48 100644 --- a/index.ts +++ b/index.ts @@ -907,10 +907,11 @@ export default function piIntercomExtension(pi: ExtensionAPI) { acknowledge: true, }); }); - pi.on("session_start", (_event, ctx) => { - if (!config.enabled) { - return; - } + // Initialize (or re-initialize) the intercom runtime for `ctx`'s current + // session. Used by `session_start` and by the fork/replace recovery path in + // `adoptSessionContext` when the session id changes without a fresh + // `session_start` (e.g. forking/branching a session in-process). + function startSessionRuntime(ctx: ExtensionContext): void { shuttingDown = false; disposed = false; runtimeStarted = true; @@ -918,6 +919,14 @@ export default function piIntercomExtension(pi: ExtensionAPI) { reconnectAttempt = 0; clearReconnectTimer(); clearStartupConnectTimer(); + // A fork/replace can swap the session id mid-process without a fresh + // session_start; drop any client still registered under the previous + // identity so we re-register cleanly under the new session id. + if (client) { + const staleClient = client; + client = null; + void staleClient.disconnect().catch(() => {}); + } runtimeContext = ctx; currentSessionId = ctx.sessionManager.getSessionId(); currentModel = ctx.model?.id ?? "unknown"; @@ -938,6 +947,43 @@ export default function piIntercomExtension(pi: ExtensionAPI) { scheduleReconnect(); }); }, 0); + } + // Detect an in-process session fork/replace and (re-)initialize so the broker + // registration follows the live session. This covers two cases that otherwise + // strand a session until a full process restart: + // 1. the runtime never started because no `session_start` fired for this + // session (a fork/resume path that adopts an id silently), and + // 2. the live session id diverged from the registered identity. + // In case 2 the stale-context guard in `getLiveContext` would reject every + // lifecycle handler, and `currentSessionId` is only advanced inside those + // (now-rejected) handlers — so without this the id can never catch up. + // This is only called from turn-level events, which fire exclusively for a + // live session, so the `disposed`/`shuttingDown` flags (true both before the + // first session_start and after a real shutdown) are intentionally not used + // to gate adoption — startSessionRuntime resets them. + // Returns true when a re-init was triggered and the caller should stop + // processing the current event (the runtime is being rebuilt asynchronously). + function adoptSessionContext(ctx: ExtensionContext): boolean { + if (!config.enabled) { + return false; + } + let liveSessionId: string; + try { + liveSessionId = ctx.sessionManager.getSessionId(); + } catch { + return false; + } + if (!runtimeStarted || (currentSessionId !== null && liveSessionId !== currentSessionId)) { + startSessionRuntime(ctx); + return true; + } + return false; + } + pi.on("session_start", (_event, ctx) => { + if (!config.enabled) { + return; + } + startSessionRuntime(ctx); }); pi.on("session_shutdown", async () => { @@ -999,6 +1045,12 @@ export default function piIntercomExtension(pi: ExtensionAPI) { scheduleInboundFlush(0); }); pi.on("turn_start", (_event, ctx) => { + // Recover from an in-process fork/replace (or a missing session_start) + // before the stale-context guard rejects this event and strands the + // session under its previous identity. + if (adoptSessionContext(ctx)) { + return; + } if (!getLiveContext(ctx)) { return; } diff --git a/intercom.integration.test.ts b/intercom.integration.test.ts index 69dce14..9eb7cd9 100644 --- a/intercom.integration.test.ts +++ b/intercom.integration.test.ts @@ -127,7 +127,10 @@ function createExtensionHarness(sessionName = "child-worker", options: { hasUI?: boolean; isIdle?: () => boolean; ui?: unknown; + sessionId?: string; + getSessionName?: () => string | undefined; } = {}) { + let harnessSessionId = options.sessionId ?? "session-child-test"; const events = new EventEmitter(); const lifecycleHandlers = new Map unknown>>(); const commands = new Map unknown>(); @@ -135,7 +138,7 @@ function createExtensionHarness(sessionName = "child-worker", options: { const entries: Array<{ type: string; data: unknown }> = []; const sentMessages: Array<{ message: { customType?: string; content?: string; details?: unknown }; options?: { triggerTurn?: boolean; deliverAs?: string } }> = []; const pi = { - getSessionName: () => sessionName, + getSessionName: options.getSessionName ?? (() => sessionName), events: { on: (channel: string, handler: (payload: unknown) => void) => { events.on(channel, handler); @@ -164,7 +167,7 @@ function createExtensionHarness(sessionName = "child-worker", options: { const ctx = { cwd: repoDir, model: { id: "child-model" }, - sessionManager: { getSessionId: () => "session-child-test" }, + sessionManager: { getSessionId: () => harnessSessionId }, isIdle: options.isIdle ?? (() => true), hasUI: options.hasUI ?? false, abort: options.abort ?? (() => undefined), @@ -177,6 +180,9 @@ function createExtensionHarness(sessionName = "child-worker", options: { commands, entries, sentMessages, + setSessionId(id: string) { + harnessSessionId = id; + }, async emitLifecycle(event: string, payload: unknown = {}, eventContext: unknown = ctx) { for (const handler of lifecycleHandlers.get(event) ?? []) { await handler(payload, eventContext); @@ -262,6 +268,19 @@ async function waitForSessionByName(client: InstanceType, throw new Error(`Timed out waiting for ${name}; saw ${JSON.stringify(sessions.map((session) => session.name))}`); } +async function waitForSessionNameSuffix(client: InstanceType, suffix: string): Promise { + const deadline = Date.now() + 2000; + while (Date.now() < deadline) { + const session = (await client.listSessions()).find((candidate) => candidate.name?.endsWith(suffix)); + if (session) { + return session; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + const sessions = await client.listSessions(); + throw new Error(`Timed out waiting for session name *${suffix}; saw ${JSON.stringify(sessions.map((session) => session.name))}`); +} + async function waitForSessionStatus(client: InstanceType, name: string, status: string): Promise { const deadline = Date.now() + 2000; while (Date.now() < deadline) { @@ -454,6 +473,60 @@ test("deferred startup connect is cancelled on shutdown", { concurrency: false } } }); +test("a forked session that starts on its first turn (no session_start) registers with the broker", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, cleanup } = await setupClients(); + // Reproduces the fork/resume path where the process adopts a session id but no + // session_start fires for it; the first lifecycle signal is a turn_start. + // Before the fix the runtime never initialized, so the session was invisible + // to peers until a full process restart. + const harness = createExtensionHarness("forked-first-turn", { hasUI: true, sessionId: "session-fork-1111" }); + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("turn_start"); + const session = await waitForSessionByName(planner, "forked-first-turn"); + assert.equal(session.name, "forked-first-turn"); + } finally { + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); + +test("an in-process session fork re-registers under the new identity", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, cleanup } = await setupClients(); + // With no /name, the advertised presence name is derived from the session id, + // so a fork changes the advertised name — observable here. Before the fix the + // stale-context guard rejected every handler once the id diverged, so the + // session stayed stranded under its pre-fork identity. + const nameForId = (id: string) => (id.startsWith("session-") ? id.slice("session-".length) : id).slice(0, 8); + const harness = createExtensionHarness("ignored", { + hasUI: true, + sessionId: "session-AAAAAAAAAA", + getSessionName: () => undefined, + }); + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + await waitForSessionNameSuffix(planner, nameForId("session-AAAAAAAAAA")); + + // Fork: the live session id changes without a fresh session_start. + harness.setSessionId("session-BBBBBBBBBB"); + await harness.emitLifecycle("turn_start"); + + await waitForSessionNameSuffix(planner, nameForId("session-BBBBBBBBBB")); + const sessions = await planner.listSessions(); + assert.equal( + sessions.some((session) => session.name?.endsWith(nameForId("session-AAAAAAAAAA"))), + false, + "old identity should no longer be registered after the fork", + ); + } finally { + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); + test("stale overlay work stops after same-session restart", { concurrency: false }, async () => { const { default: piIntercomExtension } = await import("./index.ts"); const { planner, cleanup } = await setupClients();