Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions broker/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ class IntercomBroker {

socket.on("close", () => {
if (sessionId) {
this.sessions.delete(sessionId);
this.broadcast({ type: "session_left", sessionId }, sessionId);

this.scheduleShutdownCheck();
// Only tear down the registry entry if it still points at THIS socket.
// With stable ids a reconnect may have already rebound this id to a new
// socket; the late close of the old socket must not evict the new one.
const existing = this.sessions.get(sessionId);
if (!existing || existing.socket === socket) {
this.sessions.delete(sessionId);
this.broadcast({ type: "session_left", sessionId }, sessionId);
this.scheduleShutdownCheck();
}
}
});

Expand Down Expand Up @@ -184,10 +189,17 @@ class IntercomBroker {
if (currentId) {
throw new Error("Received duplicate register message");
}

const id = randomUUID();

// Prefer a caller-supplied stable id (the pi session id) so the routing
// identity survives reconnects; a peer that resolved name->id keeps a
// valid target instead of hitting "Session not found" after the target
// reconnects with a fresh id. Fall back to a random id for older clients.
const requestedId = typeof clientMessage.sessionId === "string" ? clientMessage.sessionId.trim() : "";
const id = requestedId || randomUUID();
setId(id);
const info: SessionInfo = { ...clientMessage.session, id };
// A reconnect can arrive before the previous socket's close fires; the
// newest socket wins for this id.
this.sessions.set(id, { socket, info });

if (this.shutdownTimer) {
Expand Down
5 changes: 3 additions & 2 deletions broker/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ export class IntercomClient extends EventEmitter {
return socket;
}

connect(session: Omit<SessionInfo, "id">): Promise<void> {
connect(session: Omit<SessionInfo, "id">, sessionId?: string): Promise<void> {
if (this.socket) {
return Promise.reject(new Error("Already connected"));
}
const stableSessionId = sessionId?.trim() || undefined;

return new Promise((resolve, reject) => {
const socket = net.connect(BROKER_SOCKET);
Expand Down Expand Up @@ -254,7 +255,7 @@ export class IntercomClient extends EventEmitter {
this.once("_registered", onRegistered);

try {
writeMessage(socket, { type: "register", session });
writeMessage(socket, { type: "register", session, ...(stableSessionId ? { sessionId: stableSessionId } : {}) });
} catch (error) {
cleanupConnectionAttempt();
cleanupSocketListeners();
Expand Down
4 changes: 3 additions & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
attachClientHandlers(nextClient);
try {
await spawnBrokerIfNeeded(config.brokerCommand, config.brokerArgs);
await nextClient.connect(buildRegistration());
// Pass the pi session id as the stable broker id so the routing identity
// survives reconnects (a peer's resolved name->id stays valid).
await nextClient.connect(buildRegistration(), currentSessionId ?? undefined);
if (!getLiveContext(contextAtStart, generationAtStart)) {
await nextClient.disconnect();
throw new Error("Intercom runtime no longer active");
Expand Down
52 changes: 52 additions & 0 deletions intercom.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,58 @@ test("sessions publish automatic lifecycle status", { concurrency: false }, asyn
}
});

test("a caller-supplied stable id survives reconnect so id-routing keeps working", { concurrency: false }, async () => {
// Regression for the "Session not found" race: a peer resolves name->id and
// sends by id, but the target reconnects with a fresh id in between. With a
// caller-supplied stable id (the pi session id), the broker id is unchanged
// across reconnects, so the previously-resolved id still routes. A test that
// only sent once would pass even with per-connect random ids — the point is
// the SECOND send, to the SAME id, AFTER a reconnect.
const STABLE_ID = "session-stable-abc12345";
const { planner, cleanup } = await setupClients();

const connectWorker = async () => {
const worker = new IntercomClient();
await worker.connect({
name: "stable-worker",
cwd: repoDir,
model: "test-model",
pid: process.pid,
startedAt: Date.now(),
lastActivity: Date.now(),
}, STABLE_ID);
return worker;
};
const nextMessage = (c: InstanceType<typeof IntercomClient>): Promise<Message> =>
new Promise((resolve, reject) => {
const timer = setTimeout(() => { c.off("message", h); reject(new Error("no message")); }, 4000);
const h = (_from: SessionInfo, m: Message) => { clearTimeout(timer); c.off("message", h); resolve(m); };
c.on("message", h);
});

let worker = await connectWorker();
try {
// Broker honored the supplied id.
assert.equal(worker.sessionId, STABLE_ID);
const seen = await waitForSessionByName(planner, "stable-worker");
assert.equal(seen.id, STABLE_ID);

// Reconnect the worker (new socket) with the SAME stable id.
await worker.disconnect();
worker = await connectWorker();
assert.equal(worker.sessionId, STABLE_ID);

// The id resolved before the reconnect still routes.
const inbound = nextMessage(worker);
const result = await planner.send(STABLE_ID, { text: "still reachable" });
assert.equal(result.delivered, true);
assert.equal((await inbound).content.text, "still reachable");
} finally {
await worker.disconnect().catch(() => undefined);
await cleanup();
}
});

test("busy interactive sessions idle-gate top-level asks without aborting", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, cleanup } = await setupClients();
Expand Down
5 changes: 4 additions & 1 deletion types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ export interface Attachment {
}

export type ClientMessage =
| { type: "register"; session: Omit<SessionInfo, "id"> }
// sessionId: optional caller-supplied stable id (the pi session id). When
// present the broker uses it as the routing identity so it survives
// reconnects; older clients omit it and the broker assigns a random id.
| { type: "register"; session: Omit<SessionInfo, "id">; sessionId?: string }
| { type: "unregister" }
| { type: "list"; requestId: string }
| { type: "send"; to: string; message: Message }
Expand Down