Skip to content

Commit 260357b

Browse files
committed
allow racing multiple events
in progress
1 parent 19f6d93 commit 260357b

File tree

14 files changed

+746
-77
lines changed

14 files changed

+746
-77
lines changed

example/convex/_generated/api.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type * as example from "../example.js";
1414
import type * as inlineTest from "../inlineTest.js";
1515
import type * as nestedWorkflow from "../nestedWorkflow.js";
1616
import type * as passingSignals from "../passingSignals.js";
17+
import type * as raceEvents from "../raceEvents.js";
1718
import type * as transcription from "../transcription.js";
1819
import type * as userConfirmation from "../userConfirmation.js";
1920

@@ -30,6 +31,7 @@ declare const fullApi: ApiFromModules<{
3031
inlineTest: typeof inlineTest;
3132
nestedWorkflow: typeof nestedWorkflow;
3233
passingSignals: typeof passingSignals;
34+
raceEvents: typeof raceEvents;
3335
transcription: typeof transcription;
3436
userConfirmation: typeof userConfirmation;
3537
}>;

example/convex/raceEvents.ts

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import {
2+
defineEvent,
3+
vOnComplete,
4+
vWorkflowId,
5+
WorkflowManager,
6+
} from "@convex-dev/workflow";
7+
import { v } from "convex/values";
8+
import { components, internal } from "./_generated/api";
9+
import { internalMutation, internalQuery } from "./_generated/server";
10+
11+
const workflow = new WorkflowManager(components.workflow);
12+
13+
export const approvalEvent = defineEvent({
14+
name: "approval",
15+
validator: v.object({ proposalId: v.id("proposals") }),
16+
});
17+
export const rejectionEvent = defineEvent({
18+
name: "rejection",
19+
validator: v.object({ reason: v.string() }),
20+
});
21+
22+
export const raceExample = workflow.define({
23+
args: {},
24+
returns: v.string(),
25+
handler: async (step): Promise<string> => {
26+
await step.runMutation(internal.raceEvents.generateProposals, {
27+
workflowId: step.workflowId,
28+
});
29+
const result = await step.raceEvents([approvalEvent, rejectionEvent], {
30+
name: "waitForDecision",
31+
timeout: 15 * 60 * 1000, // 15 minutes
32+
});
33+
34+
switch (result.name) {
35+
case "approval": {
36+
const proposal = await step.runQuery(
37+
internal.raceEvents.getProposal,
38+
{
39+
workflowId: step.workflowId,
40+
proposalId: result.value.proposalId,
41+
},
42+
{ inline: true },
43+
);
44+
return `Approved. Selected: ${proposal}`;
45+
}
46+
case "rejection": {
47+
const val = result.value;
48+
return `Rejected: ${val.reason}`;
49+
}
50+
}
51+
},
52+
});
53+
54+
export const generateProposals = internalMutation({
55+
args: { workflowId: vWorkflowId },
56+
handler: async (ctx, args) => {
57+
const proposals = ["option-a", "option-b", "option-c"];
58+
await Promise.all(
59+
proposals.map((proposal) =>
60+
ctx.db.insert("proposals", { workflowId: args.workflowId, proposal }),
61+
),
62+
);
63+
},
64+
});
65+
export const getProposal = internalQuery({
66+
args: { workflowId: vWorkflowId, proposalId: v.id("proposals") },
67+
returns: v.string(),
68+
handler: async (ctx, args) => {
69+
const proposal = await ctx.db.get("proposals", args.proposalId);
70+
if (!proposal) {
71+
throw new Error(`Proposal not found: ${args.proposalId}`);
72+
}
73+
return proposal.proposal;
74+
},
75+
});
76+
77+
export const sendApproval = internalMutation({
78+
args: { workflowId: vWorkflowId, proposalId: v.id("proposals") },
79+
handler: async (ctx, args) => {
80+
await workflow.sendEvent(ctx, {
81+
...approvalEvent,
82+
workflowId: args.workflowId,
83+
value: { proposalId: args.proposalId },
84+
});
85+
},
86+
});
87+
export const sendRejection = internalMutation({
88+
args: { workflowId: vWorkflowId, reason: v.string() },
89+
handler: async (ctx, args) => {
90+
await workflow.sendEvent(ctx, {
91+
...rejectionEvent,
92+
workflowId: args.workflowId,
93+
value: { reason: args.reason },
94+
});
95+
},
96+
});
97+
98+
export const startWorkflow = internalMutation({
99+
args: {},
100+
handler: async (ctx) => {
101+
const id = await workflow.start(
102+
ctx,
103+
internal.raceEvents.raceExample,
104+
{},
105+
{
106+
onComplete: internal.raceEvents.completeWorkflow,
107+
},
108+
);
109+
await ctx.db.insert("flows", { workflowId: id, in: "", out: null });
110+
},
111+
});
112+
export const completeWorkflow = internalMutation({
113+
args: vOnComplete(v.any()),
114+
handler: async (ctx, args): Promise<void> => {
115+
const flow = await ctx.db
116+
.query("flows")
117+
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
118+
.first();
119+
if (!flow) {
120+
throw new Error(`Flow not found: ${args.workflowId}`);
121+
}
122+
await ctx.db.patch("flows", flow._id, { out: args.result });
123+
},
124+
});
125+
126+
export const raceWithoutValidators = workflow.define({
127+
args: {},
128+
returns: v.string(),
129+
handler: async (step): Promise<string> => {
130+
const result = await step.raceEvents([{ name: "go" }, { name: "stop" }]);
131+
if (result.name === "go") {
132+
return "Proceeding";
133+
}
134+
return "Stopped";
135+
},
136+
});
137+
export const sendGo = internalMutation({
138+
args: { workflowId: vWorkflowId },
139+
handler: async (ctx, args) => {
140+
await workflow.sendEvent(ctx, {
141+
name: "go",
142+
workflowId: args.workflowId,
143+
});
144+
},
145+
});
146+
export const sendStop = internalMutation({
147+
args: { workflowId: vWorkflowId },
148+
handler: async (ctx, args) => {
149+
await workflow.sendEvent(ctx, {
150+
name: "stop",
151+
workflowId: args.workflowId,
152+
});
153+
},
154+
});

example/convex/schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ export default defineSchema({
88
workflowId: vWorkflowId,
99
out: v.any(),
1010
}).index("workflowId", ["workflowId"]),
11+
proposals: defineTable({
12+
workflowId: vWorkflowId,
13+
proposal: v.string(),
14+
}).index("workflowId", ["workflowId"]),
1115
});

src/client/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ export {
4040
vEventId,
4141
vWorkflowId,
4242
vWorkflowStep,
43+
vOnComplete,
4344
type EventId,
4445
type WorkflowId,
4546
type WorkflowStep,
4647
} from "../types.js";
48+
export type { RaceResult } from "./workflowContext.js";
4749
export type { RunOptions, WorkflowCtx } from "./workflowContext.js";
4850

4951
export type CallbackOptions = {

src/client/step.ts

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ export type StepRequest = {
4444
| {
4545
kind: "sleep";
4646
args: Record<string, never>;
47+
}
48+
| {
49+
kind: "race";
50+
args: {
51+
events: Array<{ name: string }>;
52+
timeout?: number;
53+
failure?: "fail" | "retry" | "discard";
54+
};
4755
};
4856
retry: RetryBehavior | boolean | undefined;
4957
inline: boolean;
@@ -189,31 +197,56 @@ export class StepExecutor {
189197
startedAt: this.now,
190198
completedAt: runResult ? this.now : undefined,
191199
} satisfies Omit<Step, "kind">;
192-
const step =
193-
target.kind === "function"
194-
? {
195-
kind: "function" as const,
196-
functionType: target.functionType,
197-
handle: await createFunctionHandle(target.function),
198-
...commonFields,
199-
}
200-
: target.kind === "workflow"
201-
? {
202-
kind: "workflow" as const,
203-
handle: await createFunctionHandle(target.function),
204-
...commonFields,
205-
}
206-
: target.kind === "event"
207-
? {
208-
kind: "event" as const,
209-
eventId: target.args.eventId,
210-
...commonFields,
211-
args: target.args,
212-
}
213-
: {
214-
kind: "sleep" as const,
215-
...commonFields,
216-
};
200+
let step;
201+
switch (target.kind) {
202+
case "function": {
203+
step = {
204+
kind: "function" as const,
205+
functionType: target.functionType,
206+
handle: await createFunctionHandle(target.function),
207+
...commonFields,
208+
};
209+
break;
210+
}
211+
case "workflow": {
212+
step = {
213+
kind: "workflow" as const,
214+
handle: await createFunctionHandle(target.function),
215+
...commonFields,
216+
};
217+
break;
218+
}
219+
case "event": {
220+
step = {
221+
kind: "event" as const,
222+
eventId: target.args.eventId,
223+
...commonFields,
224+
args: target.args,
225+
};
226+
break;
227+
}
228+
case "sleep": {
229+
step = {
230+
kind: "sleep" as const,
231+
...commonFields,
232+
};
233+
break;
234+
}
235+
case "race": {
236+
step = {
237+
kind: "race" as const,
238+
events: target.args.events,
239+
timeout:
240+
target.args.timeout !== undefined
241+
? { ms: target.args.timeout }
242+
: undefined,
243+
failure: target.args.failure,
244+
...commonFields,
245+
args: target.args,
246+
};
247+
break;
248+
}
249+
}
217250
return {
218251
retry: message.retry,
219252
schedulerOptions: message.schedulerOptions,

0 commit comments

Comments
 (0)