Skip to content

Commit afa8dd3

Browse files
committed
allow racing multiple events
in progress
1 parent 19f6d93 commit afa8dd3

File tree

13 files changed

+725
-81
lines changed

13 files changed

+725
-81
lines changed

example/convex/_generated/api.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type * as example from "../example.js";
1414
import type * as inlineTest from "../inlineTest.js";
1515
import type * as nestedWorkflow from "../nestedWorkflow.js";
1616
import type * as passingSignals from "../passingSignals.js";
17+
import type * as raceEvents from "../raceEvents.js";
1718
import type * as transcription from "../transcription.js";
1819
import type * as userConfirmation from "../userConfirmation.js";
1920

@@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{
3031
inlineTest: typeof inlineTest;
3132
nestedWorkflow: typeof nestedWorkflow;
3233
passingSignals: typeof passingSignals;
34+
raceEvents: typeof raceEvents;
3335
transcription: typeof transcription;
3436
userConfirmation: typeof userConfirmation;
3537
}>;

example/convex/raceEvents.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import {
2+
defineEvent,
3+
vOnComplete,
4+
vWorkflowId,
5+
WorkflowManager,
6+
} from "@convex-dev/workflow";
7+
import { v } from "convex/values";
8+
import { components, internal } from "./_generated/api";
9+
import { internalMutation } from "./_generated/server";
10+
import { literals } from "convex-helpers/validators";
11+
12+
const workflow = new WorkflowManager(components.workflow);
13+
14+
const proposals = {
15+
A: "Proposal A",
16+
B: "Proposal B",
17+
C: "Proposal C",
18+
};
19+
const vProposal = literals("A", "B", "C");
20+
export const approvalEvent = defineEvent({
21+
name: "approval",
22+
validator: v.object({ proposal: vProposal }),
23+
});
24+
export const rejectionEvent = defineEvent({
25+
name: "rejection",
26+
validator: v.object({ reason: v.string() }),
27+
});
28+
29+
export const raceExample = workflow.define({
30+
args: {},
31+
returns: v.string(),
32+
handler: async (step): Promise<string> => {
33+
const result = await step.raceEvents([approvalEvent, rejectionEvent], {
34+
name: "waitForDecision",
35+
timeout: 15 * 60 * 1000, // 15 minutes
36+
});
37+
38+
switch (result.name) {
39+
case "approval": {
40+
return `Approved. Selected: ${proposals[result.value.proposal]}`;
41+
}
42+
case "rejection": {
43+
const val = result.value;
44+
return `Rejected: ${val.reason}`;
45+
}
46+
}
47+
},
48+
});
49+
50+
export const sendApproval = internalMutation({
51+
args: { workflowId: vWorkflowId, proposal: vProposal },
52+
handler: async (ctx, args) => {
53+
await workflow.sendEvent(ctx, {
54+
...approvalEvent,
55+
workflowId: args.workflowId,
56+
value: { proposal: args.proposal },
57+
});
58+
},
59+
});
60+
export const sendRejection = internalMutation({
61+
args: { workflowId: vWorkflowId, reason: v.string() },
62+
handler: async (ctx, args) => {
63+
await workflow.sendEvent(ctx, {
64+
...rejectionEvent,
65+
workflowId: args.workflowId,
66+
value: { reason: args.reason },
67+
});
68+
},
69+
});
70+
71+
export const startWorkflow = internalMutation({
72+
args: {},
73+
handler: async (ctx) => {
74+
const id = await workflow.start(
75+
ctx,
76+
internal.raceEvents.raceExample,
77+
{},
78+
{
79+
onComplete: internal.raceEvents.completeWorkflow,
80+
},
81+
);
82+
await ctx.db.insert("flows", { workflowId: id, in: "", out: null });
83+
},
84+
});
85+
export const completeWorkflow = internalMutation({
86+
args: vOnComplete(v.any()),
87+
handler: async (ctx, args): Promise<void> => {
88+
const flow = await ctx.db
89+
.query("flows")
90+
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
91+
.first();
92+
if (!flow) {
93+
throw new Error(`Flow not found: ${args.workflowId}`);
94+
}
95+
await ctx.db.patch("flows", flow._id, { out: args.result });
96+
},
97+
});
98+
99+
export const raceWithoutValidators = workflow.define({
100+
args: {},
101+
returns: v.string(),
102+
handler: async (step): Promise<string> => {
103+
const result = await step.raceEvents([{ name: "go" }, { name: "stop" }]);
104+
if (result.name === "go") {
105+
return "Proceeding";
106+
}
107+
return "Stopped";
108+
},
109+
});
110+
export const sendGo = internalMutation({
111+
args: { workflowId: vWorkflowId },
112+
handler: async (ctx, args) => {
113+
await workflow.sendEvent(ctx, {
114+
name: "go",
115+
workflowId: args.workflowId,
116+
});
117+
},
118+
});
119+
export const sendStop = internalMutation({
120+
args: { workflowId: vWorkflowId },
121+
handler: async (ctx, args) => {
122+
await workflow.sendEvent(ctx, {
123+
name: "stop",
124+
workflowId: args.workflowId,
125+
});
126+
},
127+
});

src/client/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ export {
4040
vEventId,
4141
vWorkflowId,
4242
vWorkflowStep,
43+
vOnComplete,
4344
type EventId,
4445
type WorkflowId,
4546
type WorkflowStep,
4647
} from "../types.js";
48+
export type { RaceResult } from "./workflowContext.js";
4749
export type { RunOptions, WorkflowCtx } from "./workflowContext.js";
4850

4951
export type CallbackOptions = {

src/client/step.ts

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ export type StepRequest = {
4444
| {
4545
kind: "sleep";
4646
args: Record<string, never>;
47+
}
48+
| {
49+
kind: "race";
50+
args: {
51+
events: Array<{ name: string }>;
52+
timeout?: number;
53+
failure?: "fail" | "retry" | "discard";
54+
};
4755
};
4856
retry: RetryBehavior | boolean | undefined;
4957
inline: boolean;
@@ -189,31 +197,56 @@ export class StepExecutor {
189197
startedAt: this.now,
190198
completedAt: runResult ? this.now : undefined,
191199
} satisfies Omit<Step, "kind">;
192-
const step =
193-
target.kind === "function"
194-
? {
195-
kind: "function" as const,
196-
functionType: target.functionType,
197-
handle: await createFunctionHandle(target.function),
198-
...commonFields,
199-
}
200-
: target.kind === "workflow"
201-
? {
202-
kind: "workflow" as const,
203-
handle: await createFunctionHandle(target.function),
204-
...commonFields,
205-
}
206-
: target.kind === "event"
207-
? {
208-
kind: "event" as const,
209-
eventId: target.args.eventId,
210-
...commonFields,
211-
args: target.args,
212-
}
213-
: {
214-
kind: "sleep" as const,
215-
...commonFields,
216-
};
200+
let step;
201+
switch (target.kind) {
202+
case "function": {
203+
step = {
204+
kind: "function" as const,
205+
functionType: target.functionType,
206+
handle: await createFunctionHandle(target.function),
207+
...commonFields,
208+
};
209+
break;
210+
}
211+
case "workflow": {
212+
step = {
213+
kind: "workflow" as const,
214+
handle: await createFunctionHandle(target.function),
215+
...commonFields,
216+
};
217+
break;
218+
}
219+
case "event": {
220+
step = {
221+
kind: "event" as const,
222+
eventId: target.args.eventId,
223+
...commonFields,
224+
args: target.args,
225+
};
226+
break;
227+
}
228+
case "sleep": {
229+
step = {
230+
kind: "sleep" as const,
231+
...commonFields,
232+
};
233+
break;
234+
}
235+
case "race": {
236+
step = {
237+
kind: "race" as const,
238+
events: target.args.events,
239+
timeout:
240+
target.args.timeout !== undefined
241+
? { ms: target.args.timeout }
242+
: undefined,
243+
failure: target.args.failure,
244+
...commonFields,
245+
args: target.args,
246+
};
247+
break;
248+
}
249+
}
217250
return {
218251
retry: message.retry,
219252
schedulerOptions: message.schedulerOptions,

src/client/workflowContext.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,23 @@ import type { Validator } from "convex/values";
1212
import type { EventId, SchedulerOptions, WorkflowId } from "../types.js";
1313
import { safeFunctionName } from "./safeFunctionName.js";
1414
import type { StepRequest } from "./step.js";
15+
import { assert } from "convex-helpers";
16+
17+
export type RaceResult<
18+
T extends ReadonlyArray<{
19+
name: string;
20+
validator?: Validator<any, any, any>;
21+
}>,
22+
> = {
23+
[K in keyof T]: T[K] extends {
24+
name: infer N extends string;
25+
validator: Validator<infer V, any, any>;
26+
}
27+
? { name: N; value: V }
28+
: T[K] extends { name: infer N extends string }
29+
? { name: N; value: unknown }
30+
: never;
31+
}[number];
1532

1633
export type RunOptions = {
1734
/**
@@ -125,6 +142,33 @@ export type WorkflowCtx = {
125142
* @param opts - Optionally name the step. Default: "sleep"
126143
*/
127144
sleep(duration: number, opts?: { name?: string }): Promise<void>;
145+
146+
/**
147+
* Waits for any one of multiple events, returning the first that fires.
148+
*
149+
* Each event in the array must have a unique name. The workflow blocks
150+
* until an event with one of the given names is sent, then returns the
151+
* matched event's name and value.
152+
*
153+
* @param events - Array of event definitions, each with a unique `name`
154+
* and an optional `validator` to parse the event's payload.
155+
* @param opts - Optional options, including a custom step `name` for
156+
* observability (defaults to `"race(name1, name2, ...)"`) and a
157+
* `timeout` in milliseconds after which the race rejects with an error.
158+
*/
159+
raceEvents<
160+
const T extends ReadonlyArray<{
161+
name: string;
162+
validator?: Validator<any, any, any>;
163+
}>,
164+
>(
165+
events: T,
166+
opts?: {
167+
name?: string;
168+
timeout?: number;
169+
failure?: "fail" | "retry" | "discard";
170+
},
171+
): Promise<RaceResult<T>>;
128172
};
129173

130174
export type OptionalRestArgs<
@@ -197,6 +241,58 @@ export function createWorkflowCtx(
197241
}
198242
return result as any;
199243
},
244+
245+
raceEvents: async <
246+
T extends ReadonlyArray<{
247+
name: string;
248+
validator?: Validator<any, any, any>;
249+
}>,
250+
>(
251+
events: T,
252+
opts?: {
253+
name?: string;
254+
timeout?: number;
255+
failure?: "fail" | "retry" | "discard";
256+
},
257+
) => {
258+
assert(events.length > 0, "At least one event must be specified.");
259+
assert(
260+
new Set(events.map((e) => e.name)).size === events.length,
261+
"All events must have unique names.",
262+
);
263+
assert(
264+
opts?.timeout === undefined || opts.timeout > 0,
265+
"Timeout must be a positive number.",
266+
);
267+
const result = await run(sender, {
268+
name: opts?.name ?? `race(${events.map((e) => e.name).join(", ")})`,
269+
target: {
270+
kind: "race",
271+
args: {
272+
events: events.map((e) => ({ name: e.name })),
273+
timeout: opts?.timeout,
274+
failure: opts?.failure,
275+
},
276+
},
277+
retry: undefined,
278+
inline: false,
279+
schedulerOptions: {},
280+
});
281+
const winner = events.find((e) => e.name === (result as any).eventName);
282+
if (winner?.validator) {
283+
return {
284+
name: winner.name as RaceResult<T>["name"],
285+
value: parse(
286+
winner.validator,
287+
(result as any).value,
288+
) as RaceResult<T>["value"],
289+
} as RaceResult<T>;
290+
}
291+
return {
292+
name: (result as any).eventName,
293+
value: (result as any).value,
294+
} as any;
295+
},
200296
} satisfies WorkflowCtx;
201297
}
202298

src/component/_generated/api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type * as journal from "../journal.js";
1313
import type * as logging from "../logging.js";
1414
import type * as model from "../model.js";
1515
import type * as pool from "../pool.js";
16+
import type * as race from "../race.js";
1617
import type * as utils from "../utils.js";
1718
import type * as workflow from "../workflow.js";
1819

@@ -29,6 +30,7 @@ const fullApi: ApiFromModules<{
2930
logging: typeof logging;
3031
model: typeof model;
3132
pool: typeof pool;
33+
race: typeof race;
3234
utils: typeof utils;
3335
workflow: typeof workflow;
3436
}> = anyApi as any;

0 commit comments

Comments
 (0)