Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions example/convex/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ export const exampleWorkflow = workflow.define({
console.timeLog("weather", temperature);
// Wait a beat before writing the result.
await step.sleep(100, { name: "cooldown" });
await step.runMutation(internal.example.updateFlow, {
workflowId: step.workflowId,
out: { name, celsius, farenheit, windSpeed, windGust },
await step.run(async (ctx) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", step.workflowId))
.first();
if (flow) {
await ctx.db.patch(flow._id, {
out: { name, celsius, farenheit, windSpeed, windGust },
});
}
});
console.timeEnd("overall");
return { name, celsius, farenheit, windSpeed, windGust };
},
internalMutation,
workpoolOptions: {
retryActionsByDefault: true,
},
Expand Down Expand Up @@ -183,24 +191,3 @@ export const getWeather = internalAction({
};
},
});

export const updateFlow = internalMutation({
args: {
workflowId: vWorkflowId,
out: v.any(),
},
returns: v.null(),
handler: async (ctx, args) => {
const flow = await ctx.db
.query("flows")
.withIndex("workflowId", (q) => q.eq("workflowId", args.workflowId))
.first();
if (!flow) {
console.warn(`Flow not found: ${args.workflowId}`);
return;
}
await ctx.db.patch(flow._id, {
out: args.out,
});
},
});
26 changes: 24 additions & 2 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
type GenericDataModel,
type GenericMutationCtx,
type GenericQueryCtx,
type MutationBuilder,
type PaginationOptions,
type PaginationResult,
type RegisteredMutation,
Expand Down Expand Up @@ -80,14 +81,34 @@ export type CallbackOptions = {
export type WorkflowDefinition<
ArgsValidator extends PropertyValidators,
ReturnsValidator extends Validator<any, "required", any> | void = any,
DataModel extends GenericDataModel = GenericDataModel,
> = {
args?: ArgsValidator;
handler: (
step: WorkflowCtx,
step: WorkflowCtx<DataModel>,
args: ObjectType<ArgsValidator>,
) => Promise<ReturnValueForOptionalValidator<ReturnsValidator>>;
returns?: ReturnsValidator;
workpoolOptions?: WorkpoolRetryOptions;
/**
* Provide your app's `internalMutation` (from `_generated/server`) to get
* a fully typed `ctx` in `ctx.run()` handlers, with your data model's
* tables available on `ctx.db`. This also lets any custom middleware
* you've configured run around the workflow.
*
* ```ts
* import { internalMutation } from "./_generated/server";
* workflow.define({
* internalMutation,
* handler: async (ctx, args) => {
* const user = await ctx.run(async (ctx) => {
* return ctx.db.query("users").first(); // fully typed
* });
* },
* });
* ```
*/
internalMutation?: MutationBuilder<DataModel, "internal">;
};

export type WorkflowStatus =
Expand All @@ -113,8 +134,9 @@ export class WorkflowManager {
define<
ArgsValidator extends PropertyValidators,
ReturnsValidator extends Validator<unknown, "required", string> | void,
DataModel extends GenericDataModel = GenericDataModel,
>(
workflow: WorkflowDefinition<ArgsValidator, ReturnsValidator>,
workflow: WorkflowDefinition<ArgsValidator, ReturnsValidator, DataModel>,
): RegisteredMutation<
"internal",
{
Expand Down
115 changes: 77 additions & 38 deletions src/client/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ export type StepRequest = {
| {
kind: "sleep";
args: Record<string, never>;
}
| {
kind: "inline";
handler: (
ctx: GenericMutationCtx<GenericDataModel>,
) => Promise<unknown>;
args: Record<string, never>;
};
retry: RetryBehavior | boolean | undefined;
inline: boolean;
Expand Down Expand Up @@ -128,11 +135,13 @@ export class StepExecutor {
const stepJson = JSON.stringify(
convexToJson(pick(entry.step, ["name", "args", "kind"])),
);
const messageKind =
message.target.kind === "inline" ? "function" : message.target.kind;
const messageJson = JSON.stringify(
convexToJson({
name: message.name,
args: message.target.args as Value,
kind: message.target.kind,
kind: messageKind,
}),
);
if (stepJson !== messageJson) {
Expand All @@ -154,30 +163,47 @@ export class StepExecutor {

let runResult: RunResult | undefined;
if (message.inline) {
if (target.kind !== "function" || target.functionType === "action") {
if (target.kind === "inline") {
try {
const returnValue = (await target.handler(this.ctx)) ?? null;
runResult = { kind: "success", returnValue };
} catch (error: unknown) {
runResult = {
kind: "failed",
error: formatErrorWithStack(error),
};
}
} else if (
target.kind === "function" &&
target.functionType !== "action"
) {
try {
const result =
target.functionType === "query"
? await this.ctx.runQuery(
target.function as FunctionReference<
typeof target.functionType
>,
target.args,
)
: await this.ctx.runMutation(
target.function as FunctionReference<
typeof target.functionType
>,
target.args,
);
runResult = { kind: "success", returnValue: result ?? null };
} catch (error: unknown) {
runResult = {
kind: "failed",
error: formatErrorWithStack(error),
};
}
} else {
throw new Error(
"Inline execution is only supported for queries and mutations.",
"Inline execution is only supported for queries, mutations, and inline handlers.",
);
}
try {
const result =
target.functionType === "query"
? await this.ctx.runQuery(
target.function as FunctionReference<
typeof target.functionType
>,
target.args,
)
: await this.ctx.runMutation(
target.function as FunctionReference<
typeof target.functionType
>,
target.args,
);
runResult = { kind: "success", returnValue: result ?? null };
} catch (error: unknown) {
runResult = { kind: "failed", error: formatErrorWithStack(error) };
}
}

const commonFields = {
Expand All @@ -190,30 +216,43 @@ export class StepExecutor {
completedAt: runResult ? this.now : undefined,
} satisfies Omit<Step, "kind">;
const step =
target.kind === "function"
target.kind === "inline"
? {
kind: "function" as const,
functionType: target.functionType,
handle: await createFunctionHandle(target.function),
functionType: "mutation" as const,
handle: "inline",
...commonFields,
}
: target.kind === "workflow"
: target.kind === "function"
? {
kind: "workflow" as const,
kind: "function" as const,
functionType: target.functionType,
handle: await createFunctionHandle(target.function),
...commonFields,
}
: target.kind === "event"
? {
kind: "event" as const,
eventId: target.args.eventId,
...commonFields,
args: target.args,
}
: {
kind: "sleep" as const,
...commonFields,
};
: target.kind === "workflow"
? {
kind: "workflow" as const,
handle: await createFunctionHandle(target.function),
...commonFields,
}
: target.kind === "event"
? {
kind: "event" as const,
eventId: target.args.eventId,
...commonFields,
args: target.args,
}
: target.kind === "sleep"
? {
kind: "sleep" as const,
...commonFields,
}
: ((): never => {
throw new Error(
`Unknown step kind: ${(target as any).kind}`,
);
})();
return {
retry: message.retry,
schedulerOptions: message.schedulerOptions,
Expand Down
108 changes: 106 additions & 2 deletions src/client/stepContext.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ function journalEntry(
overrides: {
name?: string;
kind?: "function" | "workflow" | "event";
functionType?: "query" | "mutation" | "action";
handle?: string;
args?: Record<string, unknown>;
runResult?: RunResult;
stepNumber?: number;
Expand Down Expand Up @@ -50,8 +52,8 @@ function journalEntry(
...base,
step: {
kind: "function",
functionType: "action",
handle: "handle",
functionType: overrides.functionType ?? "action",
handle: overrides.handle ?? "handle",
...stepCommon,
},
} as unknown as JournalEntry;
Expand Down Expand Up @@ -343,4 +345,106 @@ describe("StepExecutor + WorkflowCtx integration", () => {

expect(result).toEqual([1, 2, 3]);
});

it("ctx.run replays a successful inline handler", async () => {
const channel = new BaseChannel<StepRequest>(0);
const ctx = createWorkflowCtx("wf-11" as any, channel);

const entry = journalEntry({
name: "run",
functionType: "mutation",
handle: "inline",
args: {},
runResult: { kind: "success", returnValue: 99 },
});

const [result] = await Promise.all([
ctx.run(async () => 99),
replayFromJournal(channel, [entry]),
]);

expect(result).toBe(99);
});

it("ctx.run replays a failed inline handler", async () => {
const channel = new BaseChannel<StepRequest>(0);
const ctx = createWorkflowCtx("wf-12" as any, channel);

const entry = journalEntry({
name: "run",
functionType: "mutation",
handle: "inline",
args: {},
runResult: { kind: "failed", error: "inline boom" },
});

const [error] = await Promise.all([
ctx.run(async () => {
throw new Error("inline boom");
}).catch((e: Error) => e),
replayFromJournal(channel, [entry]),
]);

expect(error).toBeInstanceOf(Error);
expect((error as Error).message).toBe("inline boom");
});

it("ctx.run uses custom name when provided", async () => {
const channel = new BaseChannel<StepRequest>(0);
const ctx = createWorkflowCtx("wf-13" as any, channel);

const entry = journalEntry({
name: "myCustomStep",
functionType: "mutation",
handle: "inline",
args: {},
runResult: { kind: "success", returnValue: "named" },
});

const handler = async () => {
return ctx.run(async () => "named", { name: "myCustomStep" });
};

const [result] = await Promise.all([
handler(),
replayFromJournal(channel, [entry]),
]);

expect(result).toBe("named");
});

it("ctx.run works sequentially with other steps", async () => {
const channel = new BaseChannel<StepRequest>(0);
const ctx = createWorkflowCtx("wf-14" as any, channel);

const entries = [
journalEntry({
name: "run",
functionType: "mutation",
handle: "inline",
args: {},
runResult: { kind: "success", returnValue: "inline-result" },
stepNumber: 0,
}),
journalEntry({
name: "step2",
args: {},
runResult: { kind: "success", returnValue: "action-result" },
stepNumber: 1,
}),
];

const handler = async () => {
const a = await ctx.run(async () => "inline-result");
const b = await ctx.runAction(fakeFuncRef("step2") as any, {});
return [a, b];
};

const [results] = await Promise.all([
handler(),
replayFromJournal(channel, entries),
]);

expect(results).toEqual(["inline-result", "action-result"]);
});
});
Loading
Loading