diff --git a/index.ts b/index.ts index 81340c0..bbb5ceb 100644 --- a/index.ts +++ b/index.ts @@ -38,6 +38,13 @@ interface InboundMessageEntry { bodyText: string; } +interface ReplyWaiter { + from: string; + replyTo: string; + resolve: (message: Message) => void; + reject: (error: Error) => void; +} + type ContactSupervisorReason = "need_decision" | "progress_update" | "interview_request"; interface SupervisorInterviewQuestion extends Record { @@ -430,28 +437,24 @@ export default function piIntercomExtension(pi: ExtensionAPI) { const replyTracker = new ReplyTracker(); const pendingIdleMessages: InboundMessageEntry[] = []; let inboundFlushTimer: NodeJS.Timeout | null = null; - let replyWaiter: { - from: string; - replyTo: string; - resolve: (message: Message) => void; - reject: (error: Error) => void; - } | null = null; + const replyWaiters = new Map(); function waitForReply(from: string, replyTo: string, signal?: AbortSignal): Promise { - if (replyWaiter) { - return Promise.reject(new Error("Already waiting for a reply")); + if (replyWaiters.has(replyTo)) { + throw new Error(`Already waiting for reply ${replyTo}`); } if (signal?.aborted) { - return Promise.reject(new Error("Cancelled")); + throw new Error("Cancelled"); } return new Promise((resolve, reject) => { const timeout = setTimeout(() => { - rejectReplyWaiter(new Error(`No reply from "${from}" within 10 minutes`)); + rejectReplyWaiter(replyTo, new Error(`No reply from "${from}" within 10 minutes`)); }, 10 * 60 * 1000); + let waiter: ReplyWaiter; const cleanup = () => { clearTimeout(timeout); signal?.removeEventListener("abort", onAbort); - if (replyWaiter?.replyTo === replyTo) { - replyWaiter = null; + if (replyWaiters.get(replyTo) === waiter) { + replyWaiters.delete(replyTo); } }; const onAbort = () => { @@ -459,7 +462,7 @@ export default function piIntercomExtension(pi: ExtensionAPI) { reject(new Error("Cancelled")); }; signal?.addEventListener("abort", onAbort, { once: true }); - replyWaiter = { + waiter = { from, replyTo, resolve: (message) => { @@ -471,10 +474,16 @@ export default function piIntercomExtension(pi: ExtensionAPI) { reject(error); }, }; + replyWaiters.set(replyTo, waiter); }); } - function rejectReplyWaiter(error: Error): void { - replyWaiter?.reject(error); + function rejectReplyWaiter(replyTo: string, error: Error): void { + replyWaiters.get(replyTo)?.reject(error); + } + function rejectAllReplyWaiters(error: Error): void { + for (const waiter of [...replyWaiters.values()]) { + waiter.reject(error); + } } function clearReconnectTimer(): void { if (!reconnectTimer) { @@ -642,12 +651,15 @@ export default function piIntercomExtension(pi: ExtensionAPI) { if (!liveContext) { return; } - if (replyWaiter) { - const senderTarget = from.name || from.id; - const fromMatches = senderTarget.toLowerCase() === replyWaiter.from.toLowerCase() - || from.id === replyWaiter.from; - const replyMatches = message.replyTo === replyWaiter.replyTo; - if (fromMatches && replyMatches) { + if (message.replyTo) { + const replyWaiter = replyWaiters.get(message.replyTo); + if (replyWaiter) { + const senderTarget = from.name || from.id; + const fromMatches = senderTarget.toLowerCase() === replyWaiter.from.toLowerCase() + || from.id === replyWaiter.from; + if (!fromMatches) { + return; + } replyWaiter.resolve(message); return; } @@ -704,7 +716,7 @@ export default function piIntercomExtension(pi: ExtensionAPI) { if (client !== nextClient) { return; } - rejectReplyWaiter(new Error(`Disconnected while waiting for reply: ${error.message}`, { cause: error })); + rejectAllReplyWaiters(new Error(`Disconnected while waiting for reply: ${error.message}`, { cause: error })); client = null; if (!shuttingDown && !disposed) { clearReconnectTimer(); @@ -946,7 +958,7 @@ export default function piIntercomExtension(pi: ExtensionAPI) { runtimeGeneration += 1; clearStartupConnectTimer(); clearReconnectTimer(); - rejectReplyWaiter(new Error("Session shutting down")); + rejectAllReplyWaiters(new Error("Session shutting down")); replyTracker.reset(); pendingIdleMessages.length = 0; clearInboundFlushTimer(); @@ -1169,21 +1181,18 @@ export default function piIntercomExtension(pi: ExtensionAPI) { } } - if (replyWaiter) { - return { - content: [{ type: "text", text: "Already waiting for a reply" }], - isError: true, - details: { error: true }, - }; - } - let replyPromise: Promise | null = null; + let ownsReplyWaiter = false; + let questionId: string | null = null; try { - const questionId = randomUUID(); + questionId = randomUUID(); replyPromise = waitForReply(sendTo, questionId, signal); + ownsReplyWaiter = true; replyPromise.catch(() => undefined); if (signal?.aborted) { - rejectReplyWaiter(new Error("Cancelled")); + if (ownsReplyWaiter && questionId) { + rejectReplyWaiter(questionId, new Error("Cancelled")); + } try { await replyPromise; } catch { @@ -1205,7 +1214,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) { }); if (!sendResult.delivered) { const errorText = sendResult.reason ?? "Session may not exist or has disconnected."; - rejectReplyWaiter(new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`)); + if (ownsReplyWaiter && questionId) { + rejectReplyWaiter(questionId, new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`)); + } if (replyPromise) { try { await replyPromise; @@ -1251,7 +1262,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) { : {}), }; } catch (error) { - rejectReplyWaiter(toError(error)); + if (ownsReplyWaiter && questionId) { + rejectReplyWaiter(questionId, toError(error)); + } if (replyPromise) { try { await replyPromise; @@ -1464,14 +1477,6 @@ Usage: }; } - if (replyWaiter) { - return { - content: [{ type: "text", text: "Already waiting for a reply" }], - isError: true, - details: { error: true }, - }; - } - if (_signal?.aborted) { return { content: [{ type: "text", text: "Cancelled" }], @@ -1480,6 +1485,8 @@ Usage: }; } let replyPromise: Promise | null = null; + let ownsReplyWaiter = false; + let questionId: string | null = null; try { const sendTo = await resolveSessionTarget(connectedClient, to) ?? to; @@ -1497,8 +1504,10 @@ Usage: details: { error: true }, }; } - const questionId = randomUUID(); + questionId = randomUUID(); replyPromise = waitForReply(sendTo, questionId, _signal); + ownsReplyWaiter = true; + replyPromise.catch(() => undefined); const sendResult = await connectedClient.send(sendTo, { messageId: questionId, text: message, @@ -1509,7 +1518,9 @@ Usage: if (!sendResult.delivered) { const errorText = sendResult.reason ?? "Session may not exist or has disconnected."; - rejectReplyWaiter(new Error(`Message to "${to}" was not delivered: ${errorText}`)); + if (ownsReplyWaiter && questionId) { + rejectReplyWaiter(questionId, new Error(`Message to "${to}" was not delivered: ${errorText}`)); + } if (replyPromise) { try { await replyPromise; @@ -1545,7 +1556,9 @@ Usage: isError: false, }; } catch (error) { - rejectReplyWaiter(toError(error)); + if (ownsReplyWaiter && questionId) { + rejectReplyWaiter(questionId, toError(error)); + } if (replyPromise) { try { await replyPromise; diff --git a/intercom.integration.test.ts b/intercom.integration.test.ts index 69dce14..9e5a41b 100644 --- a/intercom.integration.test.ts +++ b/intercom.integration.test.ts @@ -354,6 +354,198 @@ test("contact supervisor tool renders reason and reply state", async () => { }); }); +test("concurrent intercom asks both deliver and resolve replies independently", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, orchestrator, cleanup } = await setupClients(); + const harness = createExtensionHarness("ask-worker"); + const escapedErrors: unknown[] = []; + const plannerMessages: Array<{ from: SessionInfo; message: Message }> = []; + const orchestratorMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onUnhandledRejection = (reason: unknown) => { + escapedErrors.push(reason); + }; + const onUncaughtException = (error: unknown) => { + escapedErrors.push(error); + }; + const onPlannerMessage = (from: SessionInfo, message: Message) => { + plannerMessages.push({ from, message }); + }; + const onOrchestratorMessage = (from: SessionInfo, message: Message) => { + orchestratorMessages.push({ from, message }); + }; + + process.prependListener("unhandledRejection", onUnhandledRejection); + process.prependListener("uncaughtException", onUncaughtException); + planner.on("message", onPlannerMessage); + orchestrator.on("message", onOrchestratorMessage); + + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + await waitForSessionByName(planner, "ask-worker"); + + const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!; + const firstAsk = intercomTool.execute("ask-concurrent-1", { + action: "ask", + to: "planner", + message: "First concurrent question", + }, new AbortController().signal, undefined, harness.ctx); + const secondAsk = intercomTool.execute("ask-concurrent-2", { + action: "ask", + to: "orchestrator", + message: "Second concurrent question", + }, new AbortController().signal, undefined, harness.ctx); + + const deadline = Date.now() + 5000; + while (plannerMessages.length + orchestratorMessages.length < 2 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + + assert.equal(plannerMessages.length, 1); + assert.equal(orchestratorMessages.length, 1); + await orchestrator.send(orchestratorMessages[0]!.from.id, { text: "orchestrator reply", replyTo: orchestratorMessages[0]!.message.id }); + await planner.send(plannerMessages[0]!.from.id, { text: "planner reply", replyTo: plannerMessages[0]!.message.id }); + + const results = await Promise.race([ + Promise.all([firstAsk, secondAsk]), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for concurrent asks")), 5000)), + ]); + await new Promise((resolve) => setImmediate(resolve)); + + assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []); + assert.equal(results.filter((result) => result.isError).length, 0); + assert.match( + results[0]!.content[0]?.text ?? "", + /Reply from planner:[\s\S]*planner reply/, + ); + assert.match( + results[1]!.content[0]?.text ?? "", + /Reply from orchestrator:[\s\S]*orchestrator reply/, + ); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + process.off("uncaughtException", onUncaughtException); + planner.off("message", onPlannerMessage); + orchestrator.off("message", onOrchestratorMessage); + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); + +test("intercom ask ignores a matching reply id from the wrong sender", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, cleanup } = await setupClients(); + const imposter = new IntercomClient(); + const harness = createExtensionHarness("ask-worker"); + const plannerMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onPlannerMessage = (from: SessionInfo, message: Message) => { + plannerMessages.push({ from, message }); + }; + planner.on("message", onPlannerMessage); + + try { + await imposter.connect({ + name: "imposter", + cwd: repoDir, + model: "test-model", + pid: process.pid, + startedAt: Date.now(), + lastActivity: Date.now(), + }); + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + await waitForSessionByName(planner, "ask-worker"); + + const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!; + const askResultPromise = intercomTool.execute("ask-wrong-sender", { + action: "ask", + to: "planner", + message: "Question for the real planner", + }, new AbortController().signal, undefined, harness.ctx); + + const deadline = Date.now() + 5000; + while (plannerMessages.length < 1 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + assert.equal(plannerMessages.length, 1); + const [{ from, message }] = plannerMessages; + + const wrongReply = await imposter.send(from.id, { text: "spoofed reply", replyTo: message.id }); + assert.equal(wrongReply.delivered, true); + const raceResult = await Promise.race([ + askResultPromise.then(() => "resolved"), + new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 100)), + ]); + assert.equal(raceResult, "pending"); + + const correctReply = await planner.send(from.id, { text: "real reply", replyTo: message.id }); + assert.equal(correctReply.delivered, true); + const askResult = await askResultPromise; + assert.equal(askResult.isError, false); + assert.match(askResult.content[0]?.text ?? "", /real reply/); + } finally { + planner.off("message", onPlannerMessage); + await harness.emitLifecycle("session_shutdown"); + await imposter.disconnect().catch(() => undefined); + await cleanup(); + } +}); + +test("session shutdown rejects all concurrent intercom asks", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, orchestrator, cleanup } = await setupClients(); + const harness = createExtensionHarness("ask-worker"); + const plannerMessages: Array<{ from: SessionInfo; message: Message }> = []; + const orchestratorMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onPlannerMessage = (from: SessionInfo, message: Message) => { + plannerMessages.push({ from, message }); + }; + const onOrchestratorMessage = (from: SessionInfo, message: Message) => { + orchestratorMessages.push({ from, message }); + }; + planner.on("message", onPlannerMessage); + orchestrator.on("message", onOrchestratorMessage); + + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + await waitForSessionByName(planner, "ask-worker"); + + const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!; + const firstAsk = intercomTool.execute("ask-shutdown-1", { + action: "ask", + to: "planner", + message: "First question before shutdown", + }, new AbortController().signal, undefined, harness.ctx); + const secondAsk = intercomTool.execute("ask-shutdown-2", { + action: "ask", + to: "orchestrator", + message: "Second question before shutdown", + }, new AbortController().signal, undefined, harness.ctx); + + const deadline = Date.now() + 5000; + while (plannerMessages.length + orchestratorMessages.length < 2 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + assert.equal(plannerMessages.length, 1); + assert.equal(orchestratorMessages.length, 1); + + await harness.emitLifecycle("session_shutdown"); + const results = await Promise.race([ + Promise.all([firstAsk, secondAsk]), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for shutdown rejection")), 5000)), + ]); + assert.equal(results.filter((result) => result.isError).length, 2); + assert.match(results[0]!.content[0]?.text ?? "", /Session shutting down/); + assert.match(results[1]!.content[0]?.text ?? "", /Session shutting down/); + } finally { + planner.off("message", onPlannerMessage); + orchestrator.off("message", onOrchestratorMessage); + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); + test("sessions publish automatic lifecycle status", { concurrency: false }, async () => { const { default: piIntercomExtension } = await import("./index.ts"); const { planner, cleanup } = await setupClients(); @@ -703,6 +895,87 @@ test("child supervisor tool resolves target and includes run metadata", { concur } }); +test("concurrent supervisor decisions both deliver and resolve replies independently", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { orchestrator, cleanup } = await setupClients(); + const escapedErrors: unknown[] = []; + const supervisorMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onUnhandledRejection = (reason: unknown) => { + escapedErrors.push(reason); + }; + const onUncaughtException = (error: unknown) => { + escapedErrors.push(error); + }; + const onSupervisorMessage = (from: SessionInfo, message: Message) => { + supervisorMessages.push({ from, message }); + }; + + process.prependListener("unhandledRejection", onUnhandledRejection); + process.prependListener("uncaughtException", onUncaughtException); + orchestrator.on("message", onSupervisorMessage); + + try { + await withChildOrchestratorEnv({ + orchestratorTarget: "orchestrator", + runId: "78f659a3", + agent: "worker", + index: "0", + sessionName: "subagent-worker-78f659a3-1", + }, async () => { + const harness = createExtensionHarness("subagent-worker-78f659a3-1"); + piIntercomExtension(harness.pi as never); + try { + await harness.emitLifecycle("session_start"); + const supervisorTool = harness.tools.find((tool) => tool.name === "contact_supervisor")!; + + const firstDecision = supervisorTool.execute("supervisor-concurrent-1", { + reason: "need_decision", + message: "First supervisor decision?", + }, new AbortController().signal, undefined, harness.ctx); + const secondDecision = supervisorTool.execute("supervisor-concurrent-2", { + reason: "need_decision", + message: "Second supervisor decision?", + }, new AbortController().signal, undefined, harness.ctx); + + const deadline = Date.now() + 5000; + while (supervisorMessages.length < 2 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + assert.equal(supervisorMessages.length, 2); + + const firstMessage = supervisorMessages.find(({ message }) => message.content.text.includes("First supervisor decision?"))!; + const secondMessage = supervisorMessages.find(({ message }) => message.content.text.includes("Second supervisor decision?"))!; + await orchestrator.send(secondMessage.from.id, { text: "Second supervisor reply.", replyTo: secondMessage.message.id }); + await orchestrator.send(firstMessage.from.id, { text: "First supervisor reply.", replyTo: firstMessage.message.id }); + + const results = await Promise.race([ + Promise.all([firstDecision, secondDecision]), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for supervisor decisions")), 5000)), + ]); + await new Promise((resolve) => setImmediate(resolve)); + + assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []); + assert.equal(results.filter((result) => result.isError).length, 0); + assert.match( + results[0]!.content[0]?.text ?? "", + /Reply from supervisor:[\s\S]*First supervisor reply\./, + ); + assert.match( + results[1]!.content[0]?.text ?? "", + /Reply from supervisor:[\s\S]*Second supervisor reply\./, + ); + } finally { + await harness.emitLifecycle("session_shutdown"); + } + }); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + process.off("uncaughtException", onUncaughtException); + orchestrator.off("message", onSupervisorMessage); + await cleanup(); + } +}); + test("child supervisor tool rejects invalid reasons and interview payloads", async () => { const { default: piIntercomExtension } = await import("./index.ts");