Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const SUBAGENT_RESULT_INTERCOM_DELIVERY_EVENT = "subagent:result-intercom-delive
const INBOUND_FLUSH_DELAY_MS = 200;
const INBOUND_IDLE_RETRY_MS = 500;
const DEFAULT_UNNAMED_SESSION_ALIAS_PREFIX = "subagent-chat";
const SESSION_NOT_FOUND_REASON = "Session not found";
const SUBAGENT_ORCHESTRATOR_TARGET_ENV = "PI_SUBAGENT_ORCHESTRATOR_TARGET";
const SUBAGENT_RUN_ID_ENV = "PI_SUBAGENT_RUN_ID";
const SUBAGENT_CHILD_AGENT_ENV = "PI_SUBAGENT_CHILD_AGENT";
Expand Down Expand Up @@ -409,6 +410,12 @@ function previewText(value: unknown, maxLength = 72): string | undefined {
function firstTextContent(result: { content?: Array<{ type: string; text?: string }> }): string {
return result.content?.find((item) => item.type === "text" && typeof item.text === "string")?.text?.replace(/\*\*/g, "") ?? "";
}
function isDisconnectedSessionFailure(reason: string | undefined): boolean {
return reason === SESSION_NOT_FOUND_REASON;
}
function formatPendingAskDismissedNote(dismissed: boolean): string {
return dismissed ? " The pending ask was removed because the sender is no longer connected." : "";
}
export default function piIntercomExtension(pi: ExtensionAPI) {
let client: IntercomClient | null = null;
const config: IntercomConfig = loadConfig();
Expand Down Expand Up @@ -1426,10 +1433,13 @@ Usage:
});
if (!result.delivered) {
const errorText = result.reason ?? "Session may not exist or has disconnected.";
const dismissedPendingAsk = typeof replyTo === "string" && isDisconnectedSessionFailure(result.reason)
? replyTracker.dismissPendingAsk(replyTo)
: false;
return {
content: [{ type: "text", text: `Message to "${to}" was not delivered: ${errorText}` }],
content: [{ type: "text", text: `Message to "${to}" was not delivered: ${errorText}${formatPendingAskDismissedNote(dismissedPendingAsk)}` }],
isError: true,
details: { messageId: result.id, delivered: false, reason: result.reason },
details: { messageId: result.id, delivered: false, reason: result.reason, pendingAskDismissed: dismissedPendingAsk },
};
}
pi.appendEntry("intercom_sent", {
Expand Down Expand Up @@ -1571,7 +1581,10 @@ Usage:
}

try {
const target = replyTracker.resolveReplyTarget({ to });
const target = replyTracker.resolveReplyTarget({
to,
replyTo: typeof replyTo === "string" ? replyTo : undefined,
});
if (target.from.id === connectedClient.sessionId) {
return {
content: [{ type: "text", text: "Cannot message the current session" }],
Expand All @@ -1585,10 +1598,19 @@ Usage:
});
if (!result.delivered) {
const errorText = result.reason ?? "Session may not exist or has disconnected.";
const dismissedPendingAsk = isDisconnectedSessionFailure(result.reason)
? replyTracker.dismissPendingAsk(target.message.id)
: false;
return {
content: [{ type: "text", text: `Reply to "${target.from.name || target.from.id}" was not delivered: ${errorText}` }],
content: [{ type: "text", text: `Reply to "${target.from.name || target.from.id}" was not delivered: ${errorText}${formatPendingAskDismissedNote(dismissedPendingAsk)}` }],
isError: true,
details: { messageId: result.id, delivered: false, reason: result.reason },
details: {
messageId: result.id,
delivered: false,
reason: result.reason,
replyTo: target.message.id,
pendingAskDismissed: dismissedPendingAsk,
},
};
}
replyTracker.markReplied(target.message.id);
Expand Down
46 changes: 46 additions & 0 deletions intercom.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -967,3 +967,49 @@ test("async ask can be replied to later from the single pending ask fallback", {
await cleanup();
}
});

test("reply dismisses a pending ask when the sender disconnected", { concurrency: false }, async () => {
const { planner, cleanup } = await setupClients();
const { default: piIntercomExtension } = await import("./index.ts");
const harness = createExtensionHarness("reply-worker", { hasUI: true });

try {
piIntercomExtension(harness.pi as never);
await harness.emitLifecycle("session_start");

const target = await waitForSessionByName(planner, "reply-worker");
const delivered = await planner.send(target.id, {
messageId: "stale-ask",
text: "Please reply after I disconnect.",
expectsReply: true,
});
assert.equal(delivered.delivered, true);

const deadline = Date.now() + 2000;
while (harness.sentMessages.length === 0 && Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, 25));
}
assert.equal(harness.sentMessages.length, 1);

const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!;
const pendingBeforeReply = await intercomTool.execute("pending-before", { action: "pending" }, new AbortController().signal, undefined, harness.ctx);
assert.equal(pendingBeforeReply.isError, false);
assert.match(pendingBeforeReply.content[0]?.text ?? "", /stale-ask/);

await planner.disconnect();

const replyResult = await intercomTool.execute("reply-stale", { action: "reply", message: "ok" }, new AbortController().signal, undefined, harness.ctx);
assert.equal(replyResult.isError, true);
assert.match(replyResult.content[0]?.text ?? "", /Session not found/);
assert.match(replyResult.content[0]?.text ?? "", /pending ask was removed/);
assert.equal(replyResult.details?.replyTo, "stale-ask");
assert.equal(replyResult.details?.pendingAskDismissed, true);

const pendingAfterReply = await intercomTool.execute("pending-after", { action: "pending" }, new AbortController().signal, undefined, harness.ctx);
assert.equal(pendingAfterReply.isError, false);
assert.match(pendingAfterReply.content[0]?.text ?? "", /No unresolved inbound asks/);
} finally {
await harness.emitLifecycle("session_shutdown");
await cleanup();
}
});
30 changes: 30 additions & 0 deletions reply-tracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ test("reply with to resolves matching pending ask", () => {
assert.equal(tracker.resolveReplyTarget({ to: "planner-id" }, 1002).message.id, "ask-1");
});

test("reply with replyTo resolves a specific pending ask", () => {
const tracker = new ReplyTracker();
tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "First"), 1000);
tracker.recordIncomingMessage(createSession("reviewer-id", "reviewer"), createMessage("ask-2", "Second"), 1001);

assert.equal(tracker.resolveReplyTarget({ replyTo: "ask-2" }, 1002).from.id, "reviewer-id");
assert.throws(() => tracker.resolveReplyTarget({ replyTo: "missing-ask" }, 1002), /No pending ask with replyTo/);
});

test("reply with replyTo can resolve the current turn context", () => {
const tracker = new ReplyTracker();
const context = tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "Need a decision"), 1000);
tracker.queueTurnContext(context);
tracker.beginTurn(1001);

assert.equal(tracker.resolveReplyTarget({ replyTo: "ask-1" }, 1002).from.id, "planner-id");
});

test("reply errors when no context and no pending asks", () => {
const tracker = new ReplyTracker();

Expand All @@ -75,3 +93,15 @@ test("reply removes pending ask after successful reply", () => {

assert.deepEqual(tracker.listPending(1001), []);
});

test("dismiss removes a stale pending ask and current turn context", () => {
const tracker = new ReplyTracker();
const context = tracker.recordIncomingMessage(createSession("planner-id", "planner"), createMessage("ask-1", "Need a decision"), 1000);
tracker.queueTurnContext(context);
tracker.beginTurn(1001);

assert.equal(tracker.dismissPendingAsk("ask-1"), true);
assert.equal(tracker.dismissPendingAsk("ask-1"), false);
assert.deepEqual(tracker.listPending(1002), []);
assert.throws(() => tracker.resolveReplyTarget({}, 1002), /No active intercom context to reply to/);
});
32 changes: 27 additions & 5 deletions reply-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,21 @@ export class ReplyTracker {
this.currentTurnContext = null;
}

resolveReplyTarget(options: { to?: string }, now = Date.now()): IntercomContext {
resolveReplyTarget(options: { to?: string; replyTo?: string }, now = Date.now()): IntercomContext {
this.pruneExpired(now);

if (options.replyTo) {
const matchingCurrentContext = this.currentTurnContext?.message.id === options.replyTo
? this.currentTurnContext
: null;
const matchingPendingAsk = this.pendingAsks.get(options.replyTo) ?? null;
const target = matchingCurrentContext ?? matchingPendingAsk;
if (!target) {
throw new Error(`No pending ask with replyTo \"${options.replyTo}\"`);
}
return target;
}

if (this.currentTurnContext) {
return this.currentTurnContext;
}
Expand Down Expand Up @@ -81,10 +93,11 @@ export class ReplyTracker {
}

markReplied(replyTo: string): void {
this.pendingAsks.delete(replyTo);
if (this.currentTurnContext?.message.id === replyTo) {
this.currentTurnContext = null;
}
this.removePendingAsk(replyTo);
}

dismissPendingAsk(replyTo: string): boolean {
return this.removePendingAsk(replyTo);
}

listPending(now = Date.now()): IntercomContext[] {
Expand All @@ -99,4 +112,13 @@ export class ReplyTracker {
}
}
}

private removePendingAsk(replyTo: string): boolean {
const removedPendingAsk = this.pendingAsks.delete(replyTo);
const removedCurrentContext = this.currentTurnContext?.message.id === replyTo;
if (removedCurrentContext) {
this.currentTurnContext = null;
}
return removedPendingAsk || removedCurrentContext;
}
}