diff --git a/index.ts b/index.ts index 81340c0..c70f7a7 100644 --- a/index.ts +++ b/index.ts @@ -65,6 +65,25 @@ function toError(error: unknown): Error { return error instanceof Error ? error : new Error(String(error)); } +class ReplyWaiterBusyError extends Error { + constructor() { + super("Already waiting for a reply"); + this.name = "ReplyWaiterBusyError"; + } +} + +function isReplyWaiterBusyError(error: unknown): error is ReplyWaiterBusyError { + return error instanceof ReplyWaiterBusyError; +} + +function alreadyWaitingForReplyResult() { + return { + content: [{ type: "text" as const, text: "Already waiting for a reply" }], + isError: true, + details: { error: true }, + }; +} + function formatAttachments(attachments: Attachment[]): string { let text = ""; for (const att of attachments) { @@ -438,10 +457,10 @@ export default function piIntercomExtension(pi: ExtensionAPI) { } | null = null; function waitForReply(from: string, replyTo: string, signal?: AbortSignal): Promise { if (replyWaiter) { - return Promise.reject(new Error("Already waiting for a reply")); + throw new ReplyWaiterBusyError(); } if (signal?.aborted) { - return Promise.reject(new Error("Cancelled")); + throw new Error("Cancelled"); } return new Promise((resolve, reject) => { const timeout = setTimeout(() => { @@ -1170,20 +1189,20 @@ export default function piIntercomExtension(pi: ExtensionAPI) { } if (replyWaiter) { - return { - content: [{ type: "text", text: "Already waiting for a reply" }], - isError: true, - details: { error: true }, - }; + return alreadyWaitingForReplyResult(); } let replyPromise: Promise | null = null; + let ownsReplyWaiter = false; try { const questionId = randomUUID(); replyPromise = waitForReply(sendTo, questionId, signal); + ownsReplyWaiter = true; replyPromise.catch(() => undefined); if (signal?.aborted) { - rejectReplyWaiter(new Error("Cancelled")); + if (ownsReplyWaiter) { + rejectReplyWaiter(new Error("Cancelled")); + } try { await replyPromise; } catch { @@ -1205,7 +1224,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) { }); if (!sendResult.delivered) { const errorText = sendResult.reason ?? "Session may not exist or has disconnected."; - rejectReplyWaiter(new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`)); + if (ownsReplyWaiter) { + rejectReplyWaiter(new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`)); + } if (replyPromise) { try { await replyPromise; @@ -1251,7 +1272,12 @@ export default function piIntercomExtension(pi: ExtensionAPI) { : {}), }; } catch (error) { - rejectReplyWaiter(toError(error)); + if (isReplyWaiterBusyError(error)) { + return alreadyWaitingForReplyResult(); + } + if (ownsReplyWaiter) { + rejectReplyWaiter(toError(error)); + } if (replyPromise) { try { await replyPromise; @@ -1465,11 +1491,7 @@ Usage: } if (replyWaiter) { - return { - content: [{ type: "text", text: "Already waiting for a reply" }], - isError: true, - details: { error: true }, - }; + return alreadyWaitingForReplyResult(); } if (_signal?.aborted) { @@ -1480,6 +1502,7 @@ Usage: }; } let replyPromise: Promise | null = null; + let ownsReplyWaiter = false; try { const sendTo = await resolveSessionTarget(connectedClient, to) ?? to; @@ -1497,8 +1520,13 @@ Usage: details: { error: true }, }; } + if (replyWaiter) { + return alreadyWaitingForReplyResult(); + } const questionId = randomUUID(); replyPromise = waitForReply(sendTo, questionId, _signal); + ownsReplyWaiter = true; + replyPromise.catch(() => undefined); const sendResult = await connectedClient.send(sendTo, { messageId: questionId, text: message, @@ -1509,7 +1537,9 @@ Usage: if (!sendResult.delivered) { const errorText = sendResult.reason ?? "Session may not exist or has disconnected."; - rejectReplyWaiter(new Error(`Message to "${to}" was not delivered: ${errorText}`)); + if (ownsReplyWaiter) { + rejectReplyWaiter(new Error(`Message to "${to}" was not delivered: ${errorText}`)); + } if (replyPromise) { try { await replyPromise; @@ -1545,7 +1575,12 @@ Usage: isError: false, }; } catch (error) { - rejectReplyWaiter(toError(error)); + if (isReplyWaiterBusyError(error)) { + return alreadyWaitingForReplyResult(); + } + if (ownsReplyWaiter) { + rejectReplyWaiter(toError(error)); + } if (replyPromise) { try { await replyPromise; diff --git a/intercom.integration.test.ts b/intercom.integration.test.ts index 69dce14..f19def6 100644 --- a/intercom.integration.test.ts +++ b/intercom.integration.test.ts @@ -354,6 +354,86 @@ test("contact supervisor tool renders reason and reply state", async () => { }); }); +test("concurrent intercom asks fail safely without crashing or clobbering the winner", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { planner, orchestrator, cleanup } = await setupClients(); + const harness = createExtensionHarness("ask-worker"); + const escapedErrors: unknown[] = []; + const plannerMessages: Array<{ from: SessionInfo; message: Message }> = []; + const orchestratorMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onUnhandledRejection = (reason: unknown) => { + escapedErrors.push(reason); + }; + const onUncaughtException = (error: unknown) => { + escapedErrors.push(error); + }; + const onPlannerMessage = (from: SessionInfo, message: Message) => { + plannerMessages.push({ from, message }); + }; + const onOrchestratorMessage = (from: SessionInfo, message: Message) => { + orchestratorMessages.push({ from, message }); + }; + + process.prependListener("unhandledRejection", onUnhandledRejection); + process.prependListener("uncaughtException", onUncaughtException); + planner.on("message", onPlannerMessage); + orchestrator.on("message", onOrchestratorMessage); + + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + await waitForSessionByName(planner, "ask-worker"); + + const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!; + const firstAsk = intercomTool.execute("ask-concurrent-1", { + action: "ask", + to: "planner", + message: "First concurrent question", + }, new AbortController().signal, undefined, harness.ctx); + const secondAsk = intercomTool.execute("ask-concurrent-2", { + action: "ask", + to: "orchestrator", + message: "Second concurrent question", + }, new AbortController().signal, undefined, harness.ctx); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const deliveredMessages = [ + ...plannerMessages.map((entry) => ({ ...entry, client: planner, text: "planner reply" })), + ...orchestratorMessages.map((entry) => ({ ...entry, client: orchestrator, text: "orchestrator reply" })), + ]; + for (const { client, from, message, text } of deliveredMessages) { + await client.send(from.id, { text, replyTo: message.id }); + } + + const results = await Promise.race([ + Promise.all([firstAsk, secondAsk]), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for concurrent asks")), 5000)), + ]); + await new Promise((resolve) => setImmediate(resolve)); + + assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []); + assert.equal(deliveredMessages.length, 1, "the losing ask must not send a message"); + assert.equal(results.filter((result) => result.isError).length, 1); + assert.equal(results.filter((result) => !result.isError).length, 1); + assert.match( + results.find((result) => result.isError)?.content[0]?.text ?? "", + /Already waiting for a reply/, + ); + assert.match( + results.find((result) => !result.isError)?.content[0]?.text ?? "", + /Reply from/, + ); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + process.off("uncaughtException", onUncaughtException); + planner.off("message", onPlannerMessage); + orchestrator.off("message", onOrchestratorMessage); + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); + test("sessions publish automatic lifecycle status", { concurrency: false }, async () => { const { default: piIntercomExtension } = await import("./index.ts"); const { planner, cleanup } = await setupClients(); @@ -703,6 +783,81 @@ test("child supervisor tool resolves target and includes run metadata", { concur } }); +test("concurrent supervisor decisions fail safely without clobbering the winner", { concurrency: false }, async () => { + const { default: piIntercomExtension } = await import("./index.ts"); + const { orchestrator, cleanup } = await setupClients(); + const escapedErrors: unknown[] = []; + const supervisorMessages: Array<{ from: SessionInfo; message: Message }> = []; + const onUnhandledRejection = (reason: unknown) => { + escapedErrors.push(reason); + }; + const onUncaughtException = (error: unknown) => { + escapedErrors.push(error); + }; + const onSupervisorMessage = (from: SessionInfo, message: Message) => { + supervisorMessages.push({ from, message }); + }; + + process.prependListener("unhandledRejection", onUnhandledRejection); + process.prependListener("uncaughtException", onUncaughtException); + orchestrator.on("message", onSupervisorMessage); + + try { + await withChildOrchestratorEnv({ + orchestratorTarget: "orchestrator", + runId: "78f659a3", + agent: "worker", + index: "0", + sessionName: "subagent-worker-78f659a3-1", + }, async () => { + const harness = createExtensionHarness("subagent-worker-78f659a3-1"); + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + const supervisorTool = harness.tools.find((tool) => tool.name === "contact_supervisor")!; + + const firstDecision = supervisorTool.execute("supervisor-concurrent-1", { + reason: "need_decision", + message: "First supervisor decision?", + }, new AbortController().signal, undefined, harness.ctx); + const secondDecision = supervisorTool.execute("supervisor-concurrent-2", { + reason: "need_decision", + message: "Second supervisor decision?", + }, new AbortController().signal, undefined, harness.ctx); + + await new Promise((resolve) => setTimeout(resolve, 100)); + for (const { from, message } of supervisorMessages) { + await orchestrator.send(from.id, { text: "Supervisor reply.", replyTo: message.id }); + } + + const results = await Promise.race([ + Promise.all([firstDecision, secondDecision]), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for supervisor decisions")), 5000)), + ]); + await new Promise((resolve) => setImmediate(resolve)); + + assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []); + assert.equal(supervisorMessages.length, 1, "the losing supervisor request must not send a message"); + assert.equal(results.filter((result) => result.isError).length, 1); + assert.equal(results.filter((result) => !result.isError).length, 1); + assert.match( + results.find((result) => result.isError)?.content[0]?.text ?? "", + /Already waiting for a reply/, + ); + assert.match( + results.find((result) => !result.isError)?.content[0]?.text ?? "", + /Reply from supervisor/, + ); + + await harness.emitLifecycle("session_shutdown"); + }); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + process.off("uncaughtException", onUncaughtException); + orchestrator.off("message", onSupervisorMessage); + await cleanup(); + } +}); + test("child supervisor tool rejects invalid reasons and interview payloads", async () => { const { default: piIntercomExtension } = await import("./index.ts");