Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ function toError(error: unknown): Error {
return error instanceof Error ? error : new Error(String(error));
}

class ReplyWaiterBusyError extends Error {
constructor() {
super("Already waiting for a reply");
this.name = "ReplyWaiterBusyError";
}
}

function isReplyWaiterBusyError(error: unknown): error is ReplyWaiterBusyError {
return error instanceof ReplyWaiterBusyError;
}

function alreadyWaitingForReplyResult() {
return {
content: [{ type: "text" as const, text: "Already waiting for a reply" }],
isError: true,
details: { error: true },
};
}

function formatAttachments(attachments: Attachment[]): string {
let text = "";
for (const att of attachments) {
Expand Down Expand Up @@ -438,10 +457,10 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
} | null = null;
function waitForReply(from: string, replyTo: string, signal?: AbortSignal): Promise<Message> {
if (replyWaiter) {
return Promise.reject(new Error("Already waiting for a reply"));
throw new ReplyWaiterBusyError();
}
if (signal?.aborted) {
return Promise.reject(new Error("Cancelled"));
throw new Error("Cancelled");
}
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
Expand Down Expand Up @@ -1170,20 +1189,20 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
}

if (replyWaiter) {
return {
content: [{ type: "text", text: "Already waiting for a reply" }],
isError: true,
details: { error: true },
};
return alreadyWaitingForReplyResult();
}

let replyPromise: Promise<Message> | null = null;
let ownsReplyWaiter = false;
try {
const questionId = randomUUID();
replyPromise = waitForReply(sendTo, questionId, signal);
ownsReplyWaiter = true;
replyPromise.catch(() => undefined);
if (signal?.aborted) {
rejectReplyWaiter(new Error("Cancelled"));
if (ownsReplyWaiter) {
rejectReplyWaiter(new Error("Cancelled"));
}
try {
await replyPromise;
} catch {
Expand All @@ -1205,7 +1224,9 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
});
if (!sendResult.delivered) {
const errorText = sendResult.reason ?? "Session may not exist or has disconnected.";
rejectReplyWaiter(new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`));
if (ownsReplyWaiter) {
rejectReplyWaiter(new Error(`Message to "${metadata.orchestratorTarget}" was not delivered: ${errorText}`));
}
if (replyPromise) {
try {
await replyPromise;
Expand Down Expand Up @@ -1251,7 +1272,12 @@ export default function piIntercomExtension(pi: ExtensionAPI) {
: {}),
};
} catch (error) {
rejectReplyWaiter(toError(error));
if (isReplyWaiterBusyError(error)) {
return alreadyWaitingForReplyResult();
}
if (ownsReplyWaiter) {
rejectReplyWaiter(toError(error));
}
if (replyPromise) {
try {
await replyPromise;
Expand Down Expand Up @@ -1465,11 +1491,7 @@ Usage:
}

if (replyWaiter) {
return {
content: [{ type: "text", text: "Already waiting for a reply" }],
isError: true,
details: { error: true },
};
return alreadyWaitingForReplyResult();
}

if (_signal?.aborted) {
Expand All @@ -1480,6 +1502,7 @@ Usage:
};
}
let replyPromise: Promise<Message> | null = null;
let ownsReplyWaiter = false;

try {
const sendTo = await resolveSessionTarget(connectedClient, to) ?? to;
Expand All @@ -1497,8 +1520,13 @@ Usage:
details: { error: true },
};
}
if (replyWaiter) {
return alreadyWaitingForReplyResult();
}
const questionId = randomUUID();
replyPromise = waitForReply(sendTo, questionId, _signal);
ownsReplyWaiter = true;
replyPromise.catch(() => undefined);
const sendResult = await connectedClient.send(sendTo, {
messageId: questionId,
text: message,
Expand All @@ -1509,7 +1537,9 @@ Usage:

if (!sendResult.delivered) {
const errorText = sendResult.reason ?? "Session may not exist or has disconnected.";
rejectReplyWaiter(new Error(`Message to "${to}" was not delivered: ${errorText}`));
if (ownsReplyWaiter) {
rejectReplyWaiter(new Error(`Message to "${to}" was not delivered: ${errorText}`));
}
if (replyPromise) {
try {
await replyPromise;
Expand Down Expand Up @@ -1545,7 +1575,12 @@ Usage:
isError: false,
};
} catch (error) {
rejectReplyWaiter(toError(error));
if (isReplyWaiterBusyError(error)) {
return alreadyWaitingForReplyResult();
}
if (ownsReplyWaiter) {
rejectReplyWaiter(toError(error));
}
if (replyPromise) {
try {
await replyPromise;
Expand Down
155 changes: 155 additions & 0 deletions intercom.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,86 @@ test("contact supervisor tool renders reason and reply state", async () => {
});
});

test("concurrent intercom asks fail safely without crashing or clobbering the winner", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, orchestrator, cleanup } = await setupClients();
const harness = createExtensionHarness("ask-worker");
const escapedErrors: unknown[] = [];
const plannerMessages: Array<{ from: SessionInfo; message: Message }> = [];
const orchestratorMessages: Array<{ from: SessionInfo; message: Message }> = [];
const onUnhandledRejection = (reason: unknown) => {
escapedErrors.push(reason);
};
const onUncaughtException = (error: unknown) => {
escapedErrors.push(error);
};
const onPlannerMessage = (from: SessionInfo, message: Message) => {
plannerMessages.push({ from, message });
};
const onOrchestratorMessage = (from: SessionInfo, message: Message) => {
orchestratorMessages.push({ from, message });
};

process.prependListener("unhandledRejection", onUnhandledRejection);
process.prependListener("uncaughtException", onUncaughtException);
planner.on("message", onPlannerMessage);
orchestrator.on("message", onOrchestratorMessage);

try {
piIntercomExtension(harness.pi as never);
await harness.emitLifecycle("session_start");
await waitForSessionByName(planner, "ask-worker");

const intercomTool = harness.tools.find((tool) => tool.name === "intercom")!;
const firstAsk = intercomTool.execute("ask-concurrent-1", {
action: "ask",
to: "planner",
message: "First concurrent question",
}, new AbortController().signal, undefined, harness.ctx);
const secondAsk = intercomTool.execute("ask-concurrent-2", {
action: "ask",
to: "orchestrator",
message: "Second concurrent question",
}, new AbortController().signal, undefined, harness.ctx);

await new Promise((resolve) => setTimeout(resolve, 100));

const deliveredMessages = [
...plannerMessages.map((entry) => ({ ...entry, client: planner, text: "planner reply" })),
...orchestratorMessages.map((entry) => ({ ...entry, client: orchestrator, text: "orchestrator reply" })),
];
for (const { client, from, message, text } of deliveredMessages) {
await client.send(from.id, { text, replyTo: message.id });
}

const results = await Promise.race([
Promise.all([firstAsk, secondAsk]),
new Promise<never>((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for concurrent asks")), 5000)),
]);
await new Promise((resolve) => setImmediate(resolve));

assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []);
assert.equal(deliveredMessages.length, 1, "the losing ask must not send a message");
assert.equal(results.filter((result) => result.isError).length, 1);
assert.equal(results.filter((result) => !result.isError).length, 1);
assert.match(
results.find((result) => result.isError)?.content[0]?.text ?? "",
/Already waiting for a reply/,
);
assert.match(
results.find((result) => !result.isError)?.content[0]?.text ?? "",
/Reply from/,
);
} finally {
process.off("unhandledRejection", onUnhandledRejection);
process.off("uncaughtException", onUncaughtException);
planner.off("message", onPlannerMessage);
orchestrator.off("message", onOrchestratorMessage);
await harness.emitLifecycle("session_shutdown");
await cleanup();
}
});

test("sessions publish automatic lifecycle status", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { planner, cleanup } = await setupClients();
Expand Down Expand Up @@ -703,6 +783,81 @@ test("child supervisor tool resolves target and includes run metadata", { concur
}
});

test("concurrent supervisor decisions fail safely without clobbering the winner", { concurrency: false }, async () => {
const { default: piIntercomExtension } = await import("./index.ts");
const { orchestrator, cleanup } = await setupClients();
const escapedErrors: unknown[] = [];
const supervisorMessages: Array<{ from: SessionInfo; message: Message }> = [];
const onUnhandledRejection = (reason: unknown) => {
escapedErrors.push(reason);
};
const onUncaughtException = (error: unknown) => {
escapedErrors.push(error);
};
const onSupervisorMessage = (from: SessionInfo, message: Message) => {
supervisorMessages.push({ from, message });
};

process.prependListener("unhandledRejection", onUnhandledRejection);
process.prependListener("uncaughtException", onUncaughtException);
orchestrator.on("message", onSupervisorMessage);

try {
await withChildOrchestratorEnv({
orchestratorTarget: "orchestrator",
runId: "78f659a3",
agent: "worker",
index: "0",
sessionName: "subagent-worker-78f659a3-1",
}, async () => {
const harness = createExtensionHarness("subagent-worker-78f659a3-1");
piIntercomExtension(harness.pi as never);
await harness.emitLifecycle("session_start");
const supervisorTool = harness.tools.find((tool) => tool.name === "contact_supervisor")!;

const firstDecision = supervisorTool.execute("supervisor-concurrent-1", {
reason: "need_decision",
message: "First supervisor decision?",
}, new AbortController().signal, undefined, harness.ctx);
const secondDecision = supervisorTool.execute("supervisor-concurrent-2", {
reason: "need_decision",
message: "Second supervisor decision?",
}, new AbortController().signal, undefined, harness.ctx);

await new Promise((resolve) => setTimeout(resolve, 100));
for (const { from, message } of supervisorMessages) {
await orchestrator.send(from.id, { text: "Supervisor reply.", replyTo: message.id });
}

const results = await Promise.race([
Promise.all([firstDecision, secondDecision]),
new Promise<never>((_resolve, reject) => setTimeout(() => reject(new Error("Timed out waiting for supervisor decisions")), 5000)),
]);
await new Promise((resolve) => setImmediate(resolve));

assert.deepEqual(escapedErrors.map((error) => error instanceof Error ? error.message : String(error)), []);
assert.equal(supervisorMessages.length, 1, "the losing supervisor request must not send a message");
assert.equal(results.filter((result) => result.isError).length, 1);
assert.equal(results.filter((result) => !result.isError).length, 1);
assert.match(
results.find((result) => result.isError)?.content[0]?.text ?? "",
/Already waiting for a reply/,
);
assert.match(
results.find((result) => !result.isError)?.content[0]?.text ?? "",
/Reply from supervisor/,
);

await harness.emitLifecycle("session_shutdown");
});
} finally {
process.off("unhandledRejection", onUnhandledRejection);
process.off("uncaughtException", onUncaughtException);
orchestrator.off("message", onSupervisorMessage);
await cleanup();
}
});

test("child supervisor tool rejects invalid reasons and interview payloads", async () => {
const { default: piIntercomExtension } = await import("./index.ts");

Expand Down