From 6277f23db397868e581b5e5264dd90720dad49e9 Mon Sep 17 00:00:00 2001 From: adityavkk Date: Mon, 8 Jun 2026 08:26:23 -0400 Subject: [PATCH] fix: dismiss stale pending asks --- index.ts | 32 +++++++++++++++++++++---- intercom.integration.test.ts | 46 ++++++++++++++++++++++++++++++++++++ reply-tracker.test.ts | 30 +++++++++++++++++++++++ reply-tracker.ts | 32 +++++++++++++++++++++---- 4 files changed, 130 insertions(+), 10 deletions(-) diff --git a/index.ts b/index.ts index 81340c0..0600c44 100644 --- a/index.ts +++ b/index.ts @@ -17,6 +17,7 @@ const SUBAGENT_RESULT_INTERCOM_DELIVERY_EVENT = "subagent:result-intercom-delive const INBOUND_FLUSH_DELAY_MS = 200; const INBOUND_IDLE_RETRY_MS = 500; const DEFAULT_UNNAMED_SESSION_ALIAS_PREFIX = "subagent-chat"; +const SESSION_NOT_FOUND_REASON = "Session not found"; const SUBAGENT_ORCHESTRATOR_TARGET_ENV = "PI_SUBAGENT_ORCHESTRATOR_TARGET"; const SUBAGENT_RUN_ID_ENV = "PI_SUBAGENT_RUN_ID"; const SUBAGENT_CHILD_AGENT_ENV = "PI_SUBAGENT_CHILD_AGENT"; @@ -409,6 +410,12 @@ function previewText(value: unknown, maxLength = 72): string | undefined { function firstTextContent(result: { content?: Array<{ type: string; text?: string }> }): string { return result.content?.find((item) => item.type === "text" && typeof item.text === "string")?.text?.replace(/\*\*/g, "") ?? ""; } +function isDisconnectedSessionFailure(reason: string | undefined): boolean { + return reason === SESSION_NOT_FOUND_REASON; +} +function formatPendingAskDismissedNote(dismissed: boolean): string { + return dismissed ? " The pending ask was removed because the sender is no longer connected." : ""; +} export default function piIntercomExtension(pi: ExtensionAPI) { let client: IntercomClient | null = null; const config: IntercomConfig = loadConfig(); @@ -1426,10 +1433,13 @@ Usage: }); if (!result.delivered) { const errorText = result.reason ?? "Session may not exist or has disconnected."; + const dismissedPendingAsk = typeof replyTo === "string" && isDisconnectedSessionFailure(result.reason) + ? replyTracker.dismissPendingAsk(replyTo) + : false; return { - content: [{ type: "text", text: `Message to "${to}" was not delivered: ${errorText}` }], + content: [{ type: "text", text: `Message to "${to}" was not delivered: ${errorText}${formatPendingAskDismissedNote(dismissedPendingAsk)}` }], isError: true, - details: { messageId: result.id, delivered: false, reason: result.reason }, + details: { messageId: result.id, delivered: false, reason: result.reason, pendingAskDismissed: dismissedPendingAsk }, }; } pi.appendEntry("intercom_sent", { @@ -1571,7 +1581,10 @@ Usage: } try { - const target = replyTracker.resolveReplyTarget({ to }); + const target = replyTracker.resolveReplyTarget({ + to, + replyTo: typeof replyTo === "string" ? replyTo : undefined, + }); if (target.from.id === connectedClient.sessionId) { return { content: [{ type: "text", text: "Cannot message the current session" }], @@ -1585,10 +1598,19 @@ Usage: }); if (!result.delivered) { const errorText = result.reason ?? "Session may not exist or has disconnected."; + const dismissedPendingAsk = isDisconnectedSessionFailure(result.reason) + ? replyTracker.dismissPendingAsk(target.message.id) + : false; return { - content: [{ type: "text", text: `Reply to "${target.from.name || target.from.id}" was not delivered: ${errorText}` }], + content: [{ type: "text", text: `Reply to "${target.from.name || target.from.id}" was not delivered: ${errorText}${formatPendingAskDismissedNote(dismissedPendingAsk)}` }], isError: true, - details: { messageId: result.id, delivered: false, reason: result.reason }, + details: { + messageId: result.id, + delivered: false, + reason: result.reason, + replyTo: target.message.id, + pendingAskDismissed: dismissedPendingAsk, + }, }; } replyTracker.markReplied(target.message.id); diff --git a/intercom.integration.test.ts b/intercom.integration.test.ts index 69dce14..b3453e4 100644 --- a/intercom.integration.test.ts +++ b/intercom.integration.test.ts @@ -967,3 +967,49 @@ test("async ask can be replied to later from the single pending ask fallback", { await cleanup(); } }); + +test("reply dismisses a pending ask when the sender disconnected", { concurrency: false }, async () => { + const { planner, cleanup } = await setupClients(); + const { default: piIntercomExtension } = await import("./index.ts"); + const harness = createExtensionHarness("reply-worker", { hasUI: true }); + + try { + piIntercomExtension(harness.pi as never); + await harness.emitLifecycle("session_start"); + + const target = await waitForSessionByName(planner, "reply-worker"); + const delivered = await planner.send(target.id, { + messageId: "stale-ask", + text: "Please reply after I disconnect.", + expectsReply: true, + }); + assert.equal(delivered.delivered, true); + + const deadline = Date.now() + 2000; + while (harness.sentMessages.length === 0 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, 25)); + } + assert.equal(harness.sentMessages.length, 1); + + const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!; + const pendingBeforeReply = await intercomTool.execute("pending-before", { action: "pending" }, new AbortController().signal, undefined, harness.ctx); + assert.equal(pendingBeforeReply.isError, false); + assert.match(pendingBeforeReply.content[0]?.text ?? "", /stale-ask/); + + await planner.disconnect(); + + const replyResult = await intercomTool.execute("reply-stale", { action: "reply", message: "ok" }, new AbortController().signal, undefined, harness.ctx); + assert.equal(replyResult.isError, true); + assert.match(replyResult.content[0]?.text ?? "", /Session not found/); + assert.match(replyResult.content[0]?.text ?? "", /pending ask was removed/); + assert.equal(replyResult.details?.replyTo, "stale-ask"); + assert.equal(replyResult.details?.pendingAskDismissed, true); + + const pendingAfterReply = await intercomTool.execute("pending-after", { action: "pending" }, new AbortController().signal, undefined, harness.ctx); + assert.equal(pendingAfterReply.isError, false); + assert.match(pendingAfterReply.content[0]?.text ?? "", /No unresolved inbound asks/); + } finally { + await harness.emitLifecycle("session_shutdown"); + await cleanup(); + } +}); diff --git a/reply-tracker.test.ts b/reply-tracker.test.ts index 540214b..3aac099 100644 --- a/reply-tracker.test.ts +++ b/reply-tracker.test.ts @@ -53,6 +53,24 @@ test("reply with to resolves matching pending ask", () => { assert.equal(tracker.resolveReplyTarget({ to: "planner-id" }, 1002).message.id, "ask-1"); }); +test("reply with replyTo resolves a specific pending ask", () => { + const tracker = new ReplyTracker(); + tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "First"), 1000); + tracker.recordIncomingMessage(createSession("reviewer-id", "reviewer"), createMessage("ask-2", "Second"), 1001); + + assert.equal(tracker.resolveReplyTarget({ replyTo: "ask-2" }, 1002).from.id, "reviewer-id"); + assert.throws(() => tracker.resolveReplyTarget({ replyTo: "missing-ask" }, 1002), /No pending ask with replyTo/); +}); + +test("reply with replyTo can resolve the current turn context", () => { + const tracker = new ReplyTracker(); + const context = tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "Need a decision"), 1000); + tracker.queueTurnContext(context); + tracker.beginTurn(1001); + + assert.equal(tracker.resolveReplyTarget({ replyTo: "ask-1" }, 1002).from.id, "planner-id"); +}); + test("reply errors when no context and no pending asks", () => { const tracker = new ReplyTracker(); @@ -75,3 +93,15 @@ test("reply removes pending ask after successful reply", () => { assert.deepEqual(tracker.listPending(1001), []); }); + +test("dismiss removes a stale pending ask and current turn context", () => { + const tracker = new ReplyTracker(); + const context = tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "Need a decision"), 1000); + tracker.queueTurnContext(context); + tracker.beginTurn(1001); + + assert.equal(tracker.dismissPendingAsk("ask-1"), true); + assert.equal(tracker.dismissPendingAsk("ask-1"), false); + assert.deepEqual(tracker.listPending(1002), []); + assert.throws(() => tracker.resolveReplyTarget({}, 1002), /No active intercom context to reply to/); +}); diff --git a/reply-tracker.ts b/reply-tracker.ts index bddac58..602ca7a 100644 --- a/reply-tracker.ts +++ b/reply-tracker.ts @@ -48,9 +48,21 @@ export class ReplyTracker { this.currentTurnContext = null; } - resolveReplyTarget(options: { to?: string }, now = Date.now()): IntercomContext { + resolveReplyTarget(options: { to?: string; replyTo?: string }, now = Date.now()): IntercomContext { this.pruneExpired(now); + if (options.replyTo) { + const matchingCurrentContext = this.currentTurnContext?.message.id === options.replyTo + ? this.currentTurnContext + : null; + const matchingPendingAsk = this.pendingAsks.get(options.replyTo) ?? null; + const target = matchingCurrentContext ?? matchingPendingAsk; + if (!target) { + throw new Error(`No pending ask with replyTo \"${options.replyTo}\"`); + } + return target; + } + if (this.currentTurnContext) { return this.currentTurnContext; } @@ -81,10 +93,11 @@ export class ReplyTracker { } markReplied(replyTo: string): void { - this.pendingAsks.delete(replyTo); - if (this.currentTurnContext?.message.id === replyTo) { - this.currentTurnContext = null; - } + this.removePendingAsk(replyTo); + } + + dismissPendingAsk(replyTo: string): boolean { + return this.removePendingAsk(replyTo); } listPending(now = Date.now()): IntercomContext[] { @@ -99,4 +112,13 @@ export class ReplyTracker { } } } + + private removePendingAsk(replyTo: string): boolean { + const removedPendingAsk = this.pendingAsks.delete(replyTo); + const removedCurrentContext = this.currentTurnContext?.message.id === replyTo; + if (removedCurrentContext) { + this.currentTurnContext = null; + } + return removedPendingAsk || removedCurrentContext; + } }