Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new "race" workflow step and client API to race named events (typed or untyped) with optional timeout and failure strategies; implements runtime handling, DB event bookkeeping, timeout mutation, schema/types/validators, journal/start logic, and an example demonstrating usage. Changes
Sequence DiagramsequenceDiagram
participant Client
participant WorkflowCtx
participant Journal as Journal/Scheduler
participant EventHandler as Event System
participant RaceTimeout as Race Timeout
participant DB as Database
Client->>WorkflowCtx: raceEvents([evtA, evtB], {timeout})
WorkflowCtx->>WorkflowCtx: validate names, timeout
WorkflowCtx->>Journal: run step(kind:"race", events, timeout)
Journal->>DB: query sent events for workflow
alt winner already sent
Journal->>DB: set raceWinnerEventId, mark step complete
else no winner yet
Journal->>DB: insert waiting event rows
Journal->>RaceTimeout: schedule timeout work (if timeout)
end
par First event arrives
EventHandler->>DB: insert/send event (state: sent)
EventHandler->>DB: find waiting race step
EventHandler->>DB: mark triggering event consumed
EventHandler->>DB: delete losing waiting events
EventHandler->>DB: set raceWinnerEventId, set runResult, mark step complete
and Timeout fires
RaceTimeout->>DB: load step, set runResult = failed("Timeout")
RaceTimeout->>DB: delete waiting events
RaceTimeout->>DB: mark step complete and re-enqueue
end
DB-->>WorkflowCtx: step completion result
WorkflowCtx-->>Client: resolve Promise with { name, value }
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
8e7e407 to
260357b
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/component/event.ts (1)
192-198: Use astepId-backed index for losing-event cleanup.This query pulls every waiting event for the workflow and then filters
state.stepIdin memory. The timeout path insrc/component/race.tsdoes the same scan, so larger races pay this cost on every resolution/timeout. A composite index that includesstate.stepIdwould keep cleanup scoped to the current race.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/event.ts` around lines 192 - 198, The query building losingEvents currently uses withIndex("workflowId_state") and then filters state.stepId in-memory; create and use a composite index that includes state.stepId (e.g. add or use an index like "workflowId_state_stepId") and change the query in the losingEvents code path to include .withIndex("workflowId_state_stepId", (q) => q.eq("workflowId", workflowId).eq("state.kind", "waiting").eq("state.stepId", step._id)) so the database does the scoping; also apply the same indexed-query change to the timeout path in src/component/race.ts to avoid repeated full-scan + in-memory filtering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@example/convex/raceEvents.ts`:
- Around line 65-73: The getProposal internalQuery is ignoring args.workflowId
and may return a proposal from a different workflow; after fetching the proposal
with ctx.db.get("proposals", args.proposalId) (inside getProposal), verify that
the proposal exists and that proposal.workflowId === args.workflowId, and if
not, throw an error (e.g., "Proposal not found for workflow" or "Proposal does
not belong to workflow") to prevent cross-workflow access; ensure the check uses
the existing args.workflowId and args.proposalId identifiers and retains the
current error path.
In `@src/client/workflowContext.ts`:
- Around line 263-270: Validate opts.timeout inside raceEvents before calling
run: ensure timeout is a finite non-negative number (typeof number, !isNaN,
isFinite, and >= 0); if validation fails throw a clear synchronous error (e.g.
RangeError) that mentions the offending value. Update the logic around the
run(sender, {... target: { kind: "race", args: { events: ..., timeout:
opts?.timeout, ... } } }) call so opts.timeout is checked and rejected early,
referencing the raceEvents function and the opts.timeout parameter.
In `@src/component/event.ts`:
- Around line 211-216: The race winner record currently persists eventName from
the local variable name which can be supplied by callers (args.name) and thus
misattribute the winning event; change the code that sets
step.step.runResult.returnValue.eventName to use the loaded event's canonical
name (e.g., event.name) or the event row's identifier rather than the external
name variable so send(...eventId...) cannot be overridden—ensure
step.step.raceWinnerEventId remains event._id and that the persisted
returnValue.eventName is derived from the loaded event object (event.name) or
resolved from the event row, not from args.name/local name.
---
Nitpick comments:
In `@src/component/event.ts`:
- Around line 192-198: The query building losingEvents currently uses
withIndex("workflowId_state") and then filters state.stepId in-memory; create
and use a composite index that includes state.stepId (e.g. add or use an index
like "workflowId_state_stepId") and change the query in the losingEvents code
path to include .withIndex("workflowId_state_stepId", (q) => q.eq("workflowId",
workflowId).eq("state.kind", "waiting").eq("state.stepId", step._id)) so the
database does the scoping; also apply the same indexed-query change to the
timeout path in src/component/race.ts to avoid repeated full-scan + in-memory
filtering.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bc1694ed-e667-47dd-a815-05f163c7c99b
⛔ Files ignored due to path filters (3)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**src/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**
📒 Files selected for processing (11)
example/convex/raceEvents.tsexample/convex/schema.tssrc/client/index.tssrc/client/step.tssrc/client/workflowContext.tssrc/component/event.tssrc/component/journal.tssrc/component/race.tssrc/component/schema.tssrc/component/workflow.tssrc/types.ts
287f144 to
afa8dd3
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/component/journal.ts`:
- Around line 314-325: The timeout path currently enqueues the timeout job with
only { stepId } so when the timeout handler (internal.race.timeout) resumes the
workflow it calls getWorkpool(ctx, {}) and loses per-run workpoolOptions; fix
this by propagating the same workpoolOptions used in the normal completion flow
into the timeout job payload and storing them on step.timeout (e.g., include
workpoolOptions in the object passed to workpool.enqueueMutation and assign
step.timeout.workId and step.timeout.workpoolOptions accordingly) so
internal.race.timeout / getWorkpool(ctx, {}) can restore the original options
when resuming the race.
- Around line 228-239: The query populating the local variable sent (the call
chain starting with ctx.db.query("events").withIndex("workflowId_state", ...)
and the subsequent winner selection) only reads pre-existing "sent" rows but
never marks the chosen winner or loser rows as consumed; update the branch so
that after selecting winner (and any failed competitors) you transition those
event rows to state.kind = "consumed" (using the same DB update/consume logic
used elsewhere, e.g., the existing consume/transition helper or the same state
shape used in src/component/event.ts) so they cannot be re-used by later
awaitEvent/race calls; apply the same fix to the other similar blocks referenced
(lines ~255-269 and ~291-308) where pre-existing "sent" rows are resolved.
In `@src/component/race.ts`:
- Around line 9-12: Replace the hard assertions in the race step lookup with
safe guards that treat stale/missing steps as a no-op: after fetching step with
ctx.db.get("steps", args.stepId) check if step is falsy or step.step.kind !==
"race" or !step.step.inProgress and simply return (no-op) instead of calling
assert; update the logic around the existing step handling in the same function
(the block using step, args.stepId) so downstream behavior only proceeds when
the step exists, is of kind "race", and is inProgress.
In `@src/component/workflow.ts`:
- Around line 434-448: When tearing down a race step (the branch where
entry.step.kind === "race") you must cancel the scheduled timeout referenced by
entry.step.timeout.workId before deleting/settling the step; locate the race
teardown blocks (the one that builds raceEvents and calls
ctx.db.delete("events", ...), sets entry.step.runResult/inProgress/completedAt
and calls ctx.db.replace("steps", entry._id, entry)) and invoke the scheduler
cancellation API with the workId (e.g., call the scheduler cancel function with
entry.step.timeout.workId), then clear/undefine entry.step.timeout so the
timeout cannot later fire (apply the same fix to the other race teardown block
around lines 540-550).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d6c2ec34-9d8d-4998-a506-34ec4e201bad
⛔ Files ignored due to path filters (3)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**src/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**
📒 Files selected for processing (11)
example/convex/raceEvents.tsexample/convex/schema.tssrc/client/index.tssrc/client/step.tssrc/client/workflowContext.tssrc/component/event.tssrc/component/journal.tssrc/component/race.tssrc/component/schema.tssrc/component/workflow.tssrc/types.ts
🚧 Files skipped from review as they are similar to previous changes (7)
- example/convex/schema.ts
- src/client/index.ts
- src/component/schema.ts
- src/types.ts
- src/client/step.ts
- src/client/workflowContext.ts
- example/convex/raceEvents.ts
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/client/workflowContext.ts (1)
263-266:⚠️ Potential issue | 🟡 MinorReject non-finite
timeoutvalues here.This now filters out negative/
NaNvalues, butInfinitystill passes and>also coerces non-number inputs. Guard withtypeof/Number.isFinitebefore the range check.Possible fix
- assert( - opts?.timeout === undefined || opts.timeout > 0, - "Timeout must be a positive number.", - ); + if (opts?.timeout !== undefined) { + assert( + typeof opts.timeout === "number" && + Number.isFinite(opts.timeout) && + opts.timeout > 0, + "`timeout` must be a finite positive number.", + ); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/client/workflowContext.ts` around lines 263 - 266, The timeout assertion in workflowContext.ts currently allows Infinity and coerced non-number inputs; update the check on opts?.timeout so it first verifies a numeric finite value (use typeof opts?.timeout === "number" && Number.isFinite(opts.timeout)) and then enforces > 0, while still allowing undefined; modify the assert around opts?.timeout to combine these guards so only positive finite numbers pass.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/client/workflowContext.ts`:
- Around line 263-266: The timeout assertion in workflowContext.ts currently
allows Infinity and coerced non-number inputs; update the check on opts?.timeout
so it first verifies a numeric finite value (use typeof opts?.timeout ===
"number" && Number.isFinite(opts.timeout)) and then enforces > 0, while still
allowing undefined; modify the assert around opts?.timeout to combine these
guards so only positive finite numbers pass.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0a14627a-a753-4be2-a01f-528047cb7f0f
⛔ Files ignored due to path filters (3)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**src/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**
📒 Files selected for processing (10)
example/convex/raceEvents.tssrc/client/index.tssrc/client/step.tssrc/client/workflowContext.tssrc/component/event.tssrc/component/journal.tssrc/component/race.tssrc/component/schema.tssrc/component/workflow.tssrc/types.ts
✅ Files skipped from review due to trivial changes (1)
- src/component/event.ts
🚧 Files skipped from review as they are similar to previous changes (6)
- src/client/index.ts
- src/component/race.ts
- src/component/schema.ts
- src/client/step.ts
- src/component/workflow.ts
- src/component/journal.ts
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@example/convex/raceEvents.ts`:
- Line 95: The call to ctx.db.patch uses the wrong signature (three args);
change it to the two-argument form expected by the API: call
ctx.db.patch(flow._id, { out: args.result }) instead of ctx.db.patch("flows",
flow._id, { out: args.result }); update the similar uses of ctx.db.patch in this
module (and mirror the same fix you applied in src/component/event.ts) so each
patch call passes the record id first and the fields object second.
In `@src/component/event.ts`:
- Around line 243-245: The call to ctx.db.delete in the losingEvents cleanup is
using the wrong signature (it passes a table name and an id); change the loop
that maps losingEvents to call ctx.db.delete with only the record id (use
losing._id) so Promise.all(losingEvents.map(losing =>
ctx.db.delete(losing._id))) is used; update the code referencing ctx.db.delete,
losingEvents, and losing._id in src/component/event.ts accordingly.
- Around line 242-246: The Convex DB calls use the wrong signatures: update the
calls to use id-first forms used elsewhere — replace ctx.db.replace("steps",
step._id, step) with ctx.db.replace(step._id, step), change the deletion inside
the losingEvents map from ctx.db.delete("events", losing._id) to
ctx.db.delete(losing._id), and change ctx.db.get("workflows", workflowId) to
ctx.db.get(workflowId); these changes target the ctx.db.replace, ctx.db.delete,
and ctx.db.get usages in this block so they match the correct API.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 10c833a1-a37b-4dd3-bc20-a6460bf5f12c
⛔ Files ignored due to path filters (3)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**src/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**
📒 Files selected for processing (10)
example/convex/raceEvents.tssrc/client/index.tssrc/client/step.tssrc/client/workflowContext.tssrc/component/event.tssrc/component/journal.tssrc/component/race.tssrc/component/schema.tssrc/component/workflow.tssrc/types.ts
🚧 Files skipped from review as they are similar to previous changes (6)
- src/client/index.ts
- src/component/schema.ts
- src/types.ts
- src/component/race.ts
- src/component/journal.ts
- src/component/workflow.ts
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
example/convex/raceEvents.ts (1)
95-95:⚠️ Potential issue | 🔴 CriticalIncorrect
ctx.db.patchAPI signature will cause runtime error.The Convex
ctx.db.patchmethod takes only 2 arguments:ctx.db.patch(id, fields). The table name should not be passed.🐛 Proposed fix
- await ctx.db.patch("flows", flow._id, { out: args.result }); + await ctx.db.patch(flow._id, { out: args.result });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@example/convex/raceEvents.ts` at line 95, The call to ctx.db.patch is using the wrong signature by passing the table name; update the invocation so it uses the 2-argument Convex API: call ctx.db.patch with the document id and the fields object (i.e., replace ctx.db.patch("flows", flow._id, { out: args.result }) with ctx.db.patch(flow._id, { out: args.result })) so the table name is removed and the arguments match ctx.db.patch(id, fields).src/component/event.ts (1)
242-246:⚠️ Potential issue | 🔴 CriticalIncorrect Convex DB API signatures will cause runtime errors.
Lines 242, 244, and 246 use incorrect API signatures by passing table names as the first argument. The correct Convex DB API signatures are:
ctx.db.replace(id, doc)— notctx.db.replace(tableName, id, doc)ctx.db.delete(id)— notctx.db.delete(tableName, id)ctx.db.get(id)— notctx.db.get(tableName, id)This is inconsistent with correct usages elsewhere in this file (e.g., lines 73, 155, 173).
🐛 Proposed fix
- await ctx.db.replace("steps", step._id, step); + await ctx.db.replace(step._id, step); await Promise.all( - losingEvents.map((losing) => ctx.db.delete("events", losing._id)), + losingEvents.map((losing) => ctx.db.delete(losing._id)), ); - const workflow = await ctx.db.get("workflows", workflowId); + const workflow = await ctx.db.get(workflowId);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/component/event.ts` around lines 242 - 246, The calls to Convex DB use the wrong API shape: update the three calls so they pass only ids and docs as required by ctx.db.replace(id, doc), ctx.db.delete(id), and ctx.db.get(id). Specifically, change the ctx.db.replace usage to pass step._id and step (not "steps"), change the Promise.all deletion to call ctx.db.delete(losing._id) for each losing event (not "events", losing._id), and change the get to ctx.db.get(workflowId) (not "workflows", workflowId) so they match other correct usages in this file.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/component/event.ts`:
- Around line 182-190: The call to ctx.db.patch is using the wrong signature
(passing the table name); change the call in the event consumption block to use
the two-argument Convex API: call ctx.db.patch(event._id, { state: { kind:
"consumed", stepId: step._id, waitingAt: event.state.waitingAt, sentAt:
Date.now(), consumedAt: Date.now() } }); i.e., remove the "events" table
argument so ctx.db.patch is invoked consistently with other usages (see other
ctx.db.patch calls in this file).
In `@src/component/journal.ts`:
- Around line 260-262: The deletion loop uses the wrong Convex API signature:
update the Promise.all call that iterates losingEvents so it calls ctx.db.delete
with only the record id (losing._id) rather than two args; locate the block
using losingEvents and the ctx.db.delete call in the journal logic and replace
ctx.db.delete("events", losing._id) with ctx.db.delete(losing._id) so it matches
the Convex ctx.db.delete(id) signature.
---
Duplicate comments:
In `@example/convex/raceEvents.ts`:
- Line 95: The call to ctx.db.patch is using the wrong signature by passing the
table name; update the invocation so it uses the 2-argument Convex API: call
ctx.db.patch with the document id and the fields object (i.e., replace
ctx.db.patch("flows", flow._id, { out: args.result }) with
ctx.db.patch(flow._id, { out: args.result })) so the table name is removed and
the arguments match ctx.db.patch(id, fields).
In `@src/component/event.ts`:
- Around line 242-246: The calls to Convex DB use the wrong API shape: update
the three calls so they pass only ids and docs as required by ctx.db.replace(id,
doc), ctx.db.delete(id), and ctx.db.get(id). Specifically, change the
ctx.db.replace usage to pass step._id and step (not "steps"), change the
Promise.all deletion to call ctx.db.delete(losing._id) for each losing event
(not "events", losing._id), and change the get to ctx.db.get(workflowId) (not
"workflows", workflowId) so they match other correct usages in this file.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4c438d79-ea06-47fb-ac1b-c8f2c0f6afe8
⛔ Files ignored due to path filters (3)
example/convex/_generated/api.d.tsis excluded by!**/_generated/**src/component/_generated/api.tsis excluded by!**/_generated/**src/component/_generated/component.tsis excluded by!**/_generated/**
📒 Files selected for processing (10)
example/convex/raceEvents.tssrc/client/index.tssrc/client/step.tssrc/client/workflowContext.tssrc/component/event.tssrc/component/journal.tssrc/component/race.tssrc/component/schema.tssrc/component/workflow.tssrc/types.ts
✅ Files skipped from review due to trivial changes (1)
- src/client/index.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- src/component/race.ts
- src/component/workflow.ts
4736f46 to
d7cb966
Compare
|
@ianmacartney can this help with #27? |
in progress
Add
step.raceEventsfor waiting on multiple eventsThis PR introduces
step.raceEvents, a new step type that lets a workflow waitfor any one of multiple events to fire. The first matching event resolves
the race and its name and value are returned as a typed discriminated union.
What's included
step.raceEvents(events, opts?)— new workflow step that blocks until oneof the specified events is sent, or a timeout fires.
"fail"(default),"retry", and"discard"tocontrol how error events are handled.
timeout— schedules a timeout via the workpool that fails therace if no event arrives in time.
union; narrowing on
result.namegives you the correctresult.valuetype.racestep kind in the schema, journal, and component layers.example/convex/raceEvents.ts.README.md.Design decisions
Event-based API, not arbitrary predicates. For more nested or complicated
use cases (e.g. combining events with sub-workflow results, complex state
checks), users should bring their own separate workpool/workflow instances.
Those scenarios typically need more control and observability over individual
processes. An event-based race keeps the API simple and composable while
delegating advanced orchestration to user code.
timeoutis built-in despite the above. Timeout is the exception becauseit is fundamental, common, and simple enough to implement at this level. More
importantly, it provides runtime safety for the
"retry"failure mode, whichotherwise waits indefinitely unless it receives a success event. Without a
built-in timeout,
"retry"would be dangerous in practice.Failure modes (
"fail"/"retry"/"discard"). These three modescover the main use cases: accept the first event regardless, skip errors and
wait for success, or skip errors and exhaust remaining events. The default
(
"fail") matches the existingawaitEventbehavior where an error eventthrows in the handler.
Missing
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Summary by CodeRabbit
New Features
Chores