From 9f5945c6680a4ef0ab45232138bf77839c962565 Mon Sep 17 00:00:00 2001 From: iRonin Date: Thu, 11 Jun 2026 10:35:05 -0400 Subject: [PATCH] fix: use caller-supplied stable session id as the broker routing identity The broker assigned a fresh randomUUID() on every register, so the routing identity changed on every reconnect. A peer that resolved a name to an id (or sent by id) would hit "Session not found" once the target reconnected with a new id \u2014 even though the target was alive and named. Let the client supply a stable id (the pi session id) in the register message; the broker uses it as the routing identity when present and falls back to randomUUID for older clients. Reconnects keep the same id, so previously resolved ids stay valid. As a side effect the list/parens id, any status surface, and the subagent-chat-* fallback alias all converge on one stable id. Also harden the socket-close handler: only evict a registry entry if it still points at the closing socket, so a reconnect that rebinds the id before the old socket's close fires is not torn down by that late close. Test: a worker connects with a stable id, a peer resolves it, the worker reconnects (new socket, same id), and the peer's send to the SAME id still delivers. Verified the test fails with per-connect random ids. --- broker/broker.ts | 24 ++++++++++++----- broker/client.ts | 5 ++-- index.ts | 4 ++- intercom.integration.test.ts | 52 ++++++++++++++++++++++++++++++++++++ types.ts | 5 +++- 5 files changed, 80 insertions(+), 10 deletions(-) diff --git a/broker/broker.ts b/broker/broker.ts index a2e1cc5..88920d5 100644 --- a/broker/broker.ts +++ b/broker/broker.ts @@ -135,10 +135,15 @@ class IntercomBroker { socket.on("close", () => { if (sessionId) { - this.sessions.delete(sessionId); - this.broadcast({ type: "session_left", sessionId }, sessionId); - - this.scheduleShutdownCheck(); + // Only tear down the registry entry if it still points at THIS socket. + // With stable ids a reconnect may have already rebound this id to a new + // socket; the late close of the old socket must not evict the new one. + const existing = this.sessions.get(sessionId); + if (!existing || existing.socket === socket) { + this.sessions.delete(sessionId); + this.broadcast({ type: "session_left", sessionId }, sessionId); + this.scheduleShutdownCheck(); + } } }); @@ -184,10 +189,17 @@ class IntercomBroker { if (currentId) { throw new Error("Received duplicate register message"); } - - const id = randomUUID(); + + // Prefer a caller-supplied stable id (the pi session id) so the routing + // identity survives reconnects; a peer that resolved name->id keeps a + // valid target instead of hitting "Session not found" after the target + // reconnects with a fresh id. Fall back to a random id for older clients. + const requestedId = typeof clientMessage.sessionId === "string" ? clientMessage.sessionId.trim() : ""; + const id = requestedId || randomUUID(); setId(id); const info: SessionInfo = { ...clientMessage.session, id }; + // A reconnect can arrive before the previous socket's close fires; the + // newest socket wins for this id. this.sessions.set(id, { socket, info }); if (this.shutdownTimer) { diff --git a/broker/client.ts b/broker/client.ts index a6647a7..e707342 100644 --- a/broker/client.ts +++ b/broker/client.ts @@ -149,10 +149,11 @@ export class IntercomClient extends EventEmitter { return socket; } - connect(session: Omit): Promise { + connect(session: Omit, sessionId?: string): Promise { if (this.socket) { return Promise.reject(new Error("Already connected")); } + const stableSessionId = sessionId?.trim() || undefined; return new Promise((resolve, reject) => { const socket = net.connect(BROKER_SOCKET); @@ -254,7 +255,7 @@ export class IntercomClient extends EventEmitter { this.once("_registered", onRegistered); try { - writeMessage(socket, { type: "register", session }); + writeMessage(socket, { type: "register", session, ...(stableSessionId ? { sessionId: stableSessionId } : {}) }); } catch (error) { cleanupConnectionAttempt(); cleanupSocketListeners(); diff --git a/index.ts b/index.ts index 81340c0..18ec237 100644 --- a/index.ts +++ b/index.ts @@ -756,7 +756,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) { attachClientHandlers(nextClient); try { await spawnBrokerIfNeeded(config.brokerCommand, config.brokerArgs); - await nextClient.connect(buildRegistration()); + // Pass the pi session id as the stable broker id so the routing identity + // survives reconnects (a peer's resolved name->id stays valid). + await nextClient.connect(buildRegistration(), currentSessionId ?? undefined); if (!getLiveContext(contextAtStart, generationAtStart)) { await nextClient.disconnect(); throw new Error("Intercom runtime no longer active"); diff --git a/intercom.integration.test.ts b/intercom.integration.test.ts index 69dce14..b375b48 100644 --- a/intercom.integration.test.ts +++ b/intercom.integration.test.ts @@ -394,6 +394,58 @@ test("sessions publish automatic lifecycle status", { concurrency: false }, asyn } }); +test("a caller-supplied stable id survives reconnect so id-routing keeps working", { concurrency: false }, async () => { + // Regression for the "Session not found" race: a peer resolves name->id and + // sends by id, but the target reconnects with a fresh id in between. With a + // caller-supplied stable id (the pi session id), the broker id is unchanged + // across reconnects, so the previously-resolved id still routes. A test that + // only sent once would pass even with per-connect random ids — the point is + // the SECOND send, to the SAME id, AFTER a reconnect. + const STABLE_ID = "session-stable-abc12345"; + const { planner, cleanup } = await setupClients(); + + const connectWorker = async () => { + const worker = new IntercomClient(); + await worker.connect({ + name: "stable-worker", + cwd: repoDir, + model: "test-model", + pid: process.pid, + startedAt: Date.now(), + lastActivity: Date.now(), + }, STABLE_ID); + return worker; + }; + const nextMessage = (c: InstanceType): Promise => + new Promise((resolve, reject) => { + const timer = setTimeout(() => { c.off("message", h); reject(new Error("no message")); }, 4000); + const h = (_from: SessionInfo, m: Message) => { clearTimeout(timer); c.off("message", h); resolve(m); }; + c.on("message", h); + }); + + let worker = await connectWorker(); + try { + // Broker honored the supplied id. + assert.equal(worker.sessionId, STABLE_ID); + const seen = await waitForSessionByName(planner, "stable-worker"); + assert.equal(seen.id, STABLE_ID); + + // Reconnect the worker (new socket) with the SAME stable id. + await worker.disconnect(); + worker = await connectWorker(); + assert.equal(worker.sessionId, STABLE_ID); + + // The id resolved before the reconnect still routes. + const inbound = nextMessage(worker); + const result = await planner.send(STABLE_ID, { text: "still reachable" }); + assert.equal(result.delivered, true); + assert.equal((await inbound).content.text, "still reachable"); + } finally { + await worker.disconnect().catch(() => undefined); + await cleanup(); + } +}); + test("busy interactive sessions idle-gate top-level asks without aborting", { concurrency: false }, async () => { const { default: piIntercomExtension } = await import("./index.ts"); const { planner, cleanup } = await setupClients(); diff --git a/types.ts b/types.ts index f597c90..89b07dd 100644 --- a/types.ts +++ b/types.ts @@ -28,7 +28,10 @@ export interface Attachment { } export type ClientMessage = - | { type: "register"; session: Omit } + // sessionId: optional caller-supplied stable id (the pi session id). When + // present the broker uses it as the routing identity so it survives + // reconnects; older clients omit it and the broker assigns a random id. + | { type: "register"; session: Omit; sessionId?: string } | { type: "unregister" } | { type: "list"; requestId: string } | { type: "send"; to: string; message: Message }