Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to the `pi-intercom` extension will be documented in this fi

## [Unreleased]

### Fixed
- Forked/replaced sessions now register with the broker. The runtime was only initialized on `session_start`; when a session's id changed in-process without a fresh `session_start` (forking/branching a session, or a resume path that adopts an id), it never (re-)registered and stayed unreachable until a full process restart. Re-initialize on the first turn whose live session id diverges from the registered identity (or when no `session_start` ever fired).

## [0.6.0] - 2026-05-03

### Added
Expand Down
60 changes: 56 additions & 4 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -907,17 +907,26 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
acknowledge: true,
});
});
pi.on("session_start", (_event, ctx) => {
if (!config.enabled) {
return;
}
// Initialize (or re-initialize) the intercom runtime for `ctx`'s current
// session. Used by `session_start` and by the fork/replace recovery path in
// `adoptSessionContext` when the session id changes without a fresh
// `session_start` (e.g. forking/branching a session in-process).
function startSessionRuntime(ctx: ExtensionContext): void {
shuttingDown = false;
disposed = false;
runtimeStarted = true;
runtimeGeneration += 1;
reconnectAttempt = 0;
clearReconnectTimer();
clearStartupConnectTimer();
// A fork/replace can swap the session id mid-process without a fresh
// session_start; drop any client still registered under the previous
// identity so we re-register cleanly under the new session id.
if (client) {
const staleClient = client;
client = null;
void staleClient.disconnect().catch(() => {});
}
runtimeContext = ctx;
currentSessionId = ctx.sessionManager.getSessionId();
currentModel = ctx.model?.id ?? "unknown";
Expand All @@ -938,6 +947,43 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
scheduleReconnect();
});
}, 0);
}
// Detect an in-process session fork/replace and (re-)initialize so the broker
// registration follows the live session. This covers two cases that otherwise
// strand a session until a full process restart:
// 1. the runtime never started because no `session_start` fired for this
// session (a fork/resume path that adopts an id silently), and
// 2. the live session id diverged from the registered identity.
// In case 2 the stale-context guard in `getLiveContext` would reject every
// lifecycle handler, and `currentSessionId` is only advanced inside those
// (now-rejected) handlers — so without this the id can never catch up.
// This is only called from turn-level events, which fire exclusively for a
// live session, so the `disposed`/`shuttingDown` flags (true both before the
// first session_start and after a real shutdown) are intentionally not used
// to gate adoption — startSessionRuntime resets them.
// Returns true when a re-init was triggered and the caller should stop
// processing the current event (the runtime is being rebuilt asynchronously).
function adoptSessionContext(ctx: ExtensionContext): boolean {
if (!config.enabled) {
return false;
}
let liveSessionId: string;
try {
liveSessionId = ctx.sessionManager.getSessionId();
} catch {
return false;
}
if (!runtimeStarted || (currentSessionId !== null && liveSessionId !== currentSessionId)) {
startSessionRuntime(ctx);
return true;
}
return false;
}
pi.on("session_start", (_event, ctx) => {
if (!config.enabled) {
return;
}
startSessionRuntime(ctx);
});

pi.on("session_shutdown", async () => {
Expand Down Expand Up @@ -999,6 +1045,12 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
scheduleInboundFlush(0);
});
pi.on("turn_start", (_event, ctx) => {
// Recover from an in-process fork/replace (or a missing session_start)
// before the stale-context guard rejects this event and strands the
// session under its previous identity.
if (adoptSessionContext(ctx)) {
return;
}
if (!getLiveContext(ctx)) {
return;
}
Expand Down
77 changes: 75 additions & 2 deletions intercom.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,18 @@ function createExtensionHarness(sessionName = "child-worker", options: {
hasUI?: boolean;
isIdle?: () => boolean;
ui?: unknown;
sessionId?: string;
getSessionName?: () => string | undefined;
} = {}) {
let harnessSessionId = options.sessionId ?? "session-child-test";
const events = new EventEmitter();
const lifecycleHandlers = new Map<string, Array<(event: unknown, ctx: unknown) => unknown>>();
const commands = new Map<string, (args: string, ctx: unknown) => unknown>();
const tools: CapturedTool[] = [];
const entries: Array<{ type: string; data: unknown }> = [];
const sentMessages: Array<{ message: { customType?: string; content?: string; details?: unknown }; options?: { triggerTurn?: boolean; deliverAs?: string } }> = [];
const pi = {
getSessionName: () => sessionName,
getSessionName: options.getSessionName ?? (() => sessionName),
events: {
on: (channel: string, handler: (payload: unknown) => void) => {
events.on(channel, handler);
Expand Down Expand Up @@ -164,7 +167,7 @@ function createExtensionHarness(sessionName = "child-worker", options: {
const ctx = {
cwd: repoDir,
model: { id: "child-model" },
sessionManager: { getSessionId: () => "session-child-test" },
sessionManager: { getSessionId: () => harnessSessionId },
isIdle: options.isIdle ?? (() => true),
hasUI: options.hasUI ?? false,
abort: options.abort ?? (() => undefined),
Expand All @@ -177,6 +180,9 @@ function createExtensionHarness(sessionName = "child-worker", options: {
commands,
entries,
sentMessages,
setSessionId(id: string) {
harnessSessionId = id;
},
async emitLifecycle(event: string, payload: unknown = {}, eventContext: unknown = ctx) {
for (const handler of lifecycleHandlers.get(event) ?? []) {
await handler(payload, eventContext);
Expand Down Expand Up @@ -262,6 +268,19 @@ async function waitForSessionByName(client: InstanceType<typeof IntercomClient>,
throw new Error(`Timed out waiting for ${name}; saw ${JSON.stringify(sessions.map((session) => session.name))}`);
}

async function waitForSessionNameSuffix(client: InstanceType<typeof IntercomClient>, suffix: string): Promise<SessionInfo> {
const deadline = Date.now() + 2000;
while (Date.now() < deadline) {
const session = (await client.listSessions()).find((candidate) => candidate.name?.endsWith(suffix));
if (session) {
return session;
}
await new Promise((resolve) => setTimeout(resolve, 25));
}
const sessions = await client.listSessions();
throw new Error(`Timed out waiting for session name *${suffix}; saw ${JSON.stringify(sessions.map((session) => session.name))}`);
}

async function waitForSessionStatus(client: InstanceType<typeof IntercomClient>, name: string, status: string): Promise<SessionInfo> {
const deadline = Date.now() + 2000;
while (Date.now() < deadline) {
Expand Down Expand Up @@ -454,6 +473,60 @@ test("deferred startup connect is cancelled on shutdown", { concurrency: false }
}
});

test("a forked session that starts on its first turn (no session_start) registers with the broker", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, cleanup } = await setupClients();
// Reproduces the fork/resume path where the process adopts a session id but no
// session_start fires for it; the first lifecycle signal is a turn_start.
// Before the fix the runtime never initialized, so the session was invisible
// to peers until a full process restart.
const harness = createExtensionHarness("forked-first-turn", { hasUI: true, sessionId: "session-fork-1111" });
try {
piIntercomExtension(harness.pi as never);
await harness.emitLifecycle("turn_start");
const session = await waitForSessionByName(planner, "forked-first-turn");
assert.equal(session.name, "forked-first-turn");
} finally {
await harness.emitLifecycle("session_shutdown");
await cleanup();
}
});

test("an in-process session fork re-registers under the new identity", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, cleanup } = await setupClients();
// With no /name, the advertised presence name is derived from the session id,
// so a fork changes the advertised name — observable here. Before the fix the
// stale-context guard rejected every handler once the id diverged, so the
// session stayed stranded under its pre-fork identity.
const nameForId = (id: string) => (id.startsWith("session-") ? id.slice("session-".length) : id).slice(0, 8);
const harness = createExtensionHarness("ignored", {
hasUI: true,
sessionId: "session-AAAAAAAAAA",
getSessionName: () => undefined,
});
try {
piIntercomExtension(harness.pi as never);
await harness.emitLifecycle("session_start");
await waitForSessionNameSuffix(planner, nameForId("session-AAAAAAAAAA"));

// Fork: the live session id changes without a fresh session_start.
harness.setSessionId("session-BBBBBBBBBB");
await harness.emitLifecycle("turn_start");

await waitForSessionNameSuffix(planner, nameForId("session-BBBBBBBBBB"));
const sessions = await planner.listSessions();
assert.equal(
sessions.some((session) => session.name?.endsWith(nameForId("session-AAAAAAAAAA"))),
false,
"old identity should no longer be registered after the fork",
);
} finally {
await harness.emitLifecycle("session_shutdown");
await cleanup();
}
});

test("stale overlay work stops after same-session restart", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, cleanup } = await setupClients();
Expand Down