-
Notifications
You must be signed in to change notification settings - Fork 96
feat: temporal deployment #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
xavidop
wants to merge
2
commits into
genkit-ai:main
Choose a base branch
from
xavidop:xavier/temporal
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+266
−0
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,265 @@ | ||
| --- | ||
| title: Deploy durable flows with Temporal | ||
| description: Learn how to run Genkit flows as durable Temporal Workflows using the genkitx-temporal plugin, with automatic retries, durable history, timeouts, cancellation, and operational visibility. | ||
| supportedLanguages: | ||
| - js | ||
| --- | ||
|
|
||
| ## Running Genkit flows as Temporal Workflows | ||
|
|
||
| The [`genkitx-temporal`](https://github.com/xavidop/genkitx-temporal) plugin lets | ||
| you execute any Genkit flow inside a [Temporal](https://temporal.io/) Workflow, | ||
| transparently. You keep writing flows exactly the way you always have, and the | ||
| plugin takes care of: | ||
|
|
||
| - Registering the flow so a Temporal Worker can pick it up. | ||
| - Wrapping each execution in a deterministic Temporal Workflow. | ||
| - Running the non-deterministic LLM/tool/RAG work inside a Temporal Activity, | ||
| so retries and timeouts are safe. | ||
| - Providing helpers to start workflows from any client. | ||
|
|
||
| Under the hood, the plugin ships a generic, deterministic workflow called | ||
| `runGenkitFlow` that invokes a single Temporal Activity called | ||
| `runGenkitFlowActivity`. The activity looks up your Genkit flow by name in an | ||
| in-process registry and runs it inside a full Node environment, where all the | ||
| messy, non-deterministic things (network calls, model calls, tool calls, RAG | ||
| lookups) are perfectly fine. | ||
|
|
||
| ```text | ||
| ┌────────────┐ start workflow ┌──────────────────────┐ | ||
| │ Client │ ────────────────▶ │ Temporal Server │ | ||
| └────────────┘ └─────────┬────────────┘ | ||
| │ task | ||
| ▼ | ||
| ┌──────────────────────┐ | ||
| │ Worker process │ | ||
| │ ┌────────────────┐ │ | ||
| │ │ runGenkitFlow │ │ (workflow, sandboxed) | ||
| │ │ │ │ │ | ||
| │ │ ▼ │ │ | ||
| │ │ runGenkit- │ │ (activity, full Node) | ||
| │ │ FlowActivity │ │ | ||
| │ │ │ │ │ | ||
| │ │ ▼ │ │ | ||
| │ │ your Genkit │ │ | ||
| │ │ flow (LLM…) │ │ | ||
| │ └────────────────┘ │ | ||
| └──────────────────────┘ | ||
| ``` | ||
|
|
||
| This split is the whole trick. Temporal Workflows must be deterministic so that | ||
| they can be replayed from event history after a crash, deploy, or scale event. | ||
| LLM calls obviously are not deterministic. Putting the LLM work inside an | ||
| Activity is the canonical Temporal pattern, and the plugin does it for you so | ||
| you don't have to think about it. | ||
|
|
||
| ## Why Temporal and Genkit are a great match | ||
|
|
||
| Genkit gives you the authoring primitives; Temporal gives you the runtime | ||
| guarantees. Together: | ||
|
|
||
| - **Automatic retries for transient errors.** LLM providers regularly return | ||
| `429`s and `5xx`s. Tool calls hit the network. Vector stores time out. | ||
| Temporal lets you express retry policies (exponential backoff, max attempts, | ||
| non-retryable error types) declaratively, applied to every flow execution. | ||
| - **Durable history.** Every event in your flow's execution is persisted by the | ||
| Temporal Server. If your Worker pod is killed mid-flow, another Worker picks | ||
| up the workflow exactly where it left off, with all prior activity results | ||
| intact. No partial state, no double-charging the user, no orphan jobs. | ||
| - **Timeouts, heartbeats, and cancellation.** Long-running agents that browse, | ||
| plan, and call tools for minutes are a nightmare to control with plain HTTP. | ||
| Temporal models start-to-close, schedule-to-close, and heartbeat timeouts as | ||
| first-class concepts, plus explicit cancellation semantics you can wire to a | ||
| user closing a tab. | ||
| - **Operational visibility out of the box.** The Temporal UI gives you a | ||
| per-execution timeline of every workflow and activity. You can see inputs, | ||
| outputs, retries, failures, and stack traces, search by workflow id, and | ||
| terminate or signal running workflows. | ||
| - **Horizontal scalability for free.** Need more throughput? Run more Worker | ||
| processes pointing at the same task queue. The Temporal Server load-balances | ||
| workflows and activities across them. Your flows do not need to change. | ||
| - **Long-running, human-in-the-loop friendly.** Temporal workflows can sleep | ||
| for days, wait for signals from external systems, and resume cleanly. Perfect | ||
| for agents that need approval before executing a sensitive tool, or RAG | ||
| pipelines that wait for a document indexing job to finish. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - [Node.js](https://nodejs.org/) >= 20 | ||
| - A running Temporal Server. For local development, the easiest path is the | ||
| bundled dev server: | ||
|
|
||
| ```bash | ||
| brew install temporal | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| This starts a Temporal Server on `localhost:7233` and the UI on | ||
| `http://localhost:8233`. | ||
|
|
||
| For production, you can use [Temporal Cloud](https://temporal.io/cloud) or a | ||
| self-hosted cluster. | ||
|
|
||
| ## Installation | ||
|
|
||
| ```bash | ||
| npm install genkitx-temporal genkit | ||
| ``` | ||
|
|
||
| The Temporal SDK packages are peer-installed automatically as dependencies of | ||
| the plugin. | ||
|
|
||
| ## Usage end to end | ||
|
|
||
| The plugin exposes a small, focused API. Three things are enough to ship a | ||
| durable Genkit flow: define it with the Temporal-aware helper, start a Worker, | ||
| and execute it from a client. | ||
|
|
||
| ### 1. Define a flow with `defineTemporalFlow` | ||
|
|
||
| `defineTemporalFlow` is a drop-in replacement for `ai.defineFlow`. The returned | ||
| object is a normal Genkit flow, so you can still call it directly, expose it via | ||
| the Developer UI, run it from tests, and so on. The only difference is that it | ||
| is also registered internally so a Temporal Worker can find it by name. | ||
|
|
||
| ```typescript | ||
| // flows.ts | ||
| import { genkit, z } from 'genkit'; | ||
| import { googleAI } from '@genkit-ai/google-genai'; | ||
| import { defineTemporalFlow, temporal } from 'genkitx-temporal'; | ||
|
|
||
| export const ai = genkit({ | ||
| plugins: [ | ||
| googleAI(), | ||
| temporal({ taskQueue: 'my-queue' }), | ||
| ], | ||
| model: googleAI.model('gemini-flash-latest'), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The model References
|
||
| }); | ||
|
|
||
| export const jokeFlow = defineTemporalFlow( | ||
| ai, | ||
| { | ||
| name: 'jokeFlow', | ||
| inputSchema: z.string(), | ||
| outputSchema: z.string(), | ||
| }, | ||
| async (subject) => { | ||
| const { text } = await ai.generate(`Tell me a joke about ${subject}`); | ||
| return text; | ||
| }, | ||
| ); | ||
| ``` | ||
|
|
||
| A few things to notice: | ||
|
|
||
| - You configure the plugin like any other Genkit plugin, passing the Temporal | ||
| task queue (and optionally `address`, `namespace`, etc.). | ||
| - The flow body is plain Genkit code. No Temporal-specific imports, no | ||
| `proxyActivities`, no determinism gymnastics. The plugin handles all that for | ||
| you. | ||
| - The flow's `name` is also its Temporal registration key. Keep it unique | ||
| within your Worker. | ||
|
|
||
| ### 2. Start a Worker | ||
|
|
||
| Workers are the processes that actually execute your flows. A Worker imports | ||
| your flows (so the registry is populated) and then calls `startTemporalWorker`: | ||
|
|
||
| ```typescript | ||
| // worker.ts | ||
| import './flows'; // side-effect import: registers the flows | ||
| import { startTemporalWorker } from 'genkitx-temporal'; | ||
|
|
||
| startTemporalWorker({ taskQueue: 'my-queue' }).catch((e) => { | ||
| console.error(e); | ||
| process.exit(1); | ||
| }); | ||
| ``` | ||
|
|
||
| ```bash | ||
| node ./dist/worker.js | ||
| ``` | ||
|
|
||
| You can run as many Worker processes as you want against the same task queue. | ||
| Temporal will distribute work across them automatically. If a Worker dies | ||
| mid-flow, another one picks up. | ||
|
|
||
| ### 3. Execute a flow as a Temporal Workflow | ||
|
|
||
| From any client (an HTTP handler, a CLI, a cron job, another workflow), use | ||
| `executeTemporalFlow` to start a workflow and `await` its result: | ||
|
|
||
| ```typescript | ||
| // client.ts | ||
| import { executeTemporalFlow } from 'genkitx-temporal'; | ||
| import { jokeFlow } from './flows'; | ||
|
|
||
| const result = await executeTemporalFlow(jokeFlow, 'cats', { | ||
| taskQueue: 'my-queue', | ||
| }); | ||
| console.log(result); | ||
| ``` | ||
|
|
||
| If you don't want to block on the result, use `startTemporalFlow` instead. It | ||
| returns the raw Temporal `WorkflowHandle`, which lets you query, signal, or | ||
| cancel the running workflow later. This is the building block for | ||
| human-in-the-loop scenarios, scheduled flows, fan-out/fan-in patterns, and so | ||
| on. | ||
|
|
||
| ### Configuration | ||
|
|
||
| `temporal(options)` and every helper accept the same connection options. | ||
| Anything you don't pass falls back to environment variables, then to sensible | ||
| defaults: | ||
|
|
||
| | Option | Environment variable | Default | | ||
| | ----------- | ----------------------- | ---------------- | | ||
| | `address` | `TEMPORAL_ADDRESS` | `localhost:7233` | | ||
| | `namespace` | `TEMPORAL_NAMESPACE` | `default` | | ||
| | `taskQueue` | `TEMPORAL_TASK_QUEUE` | `genkit` | | ||
|
|
||
| This makes it straightforward to run the same code locally against a dev server | ||
| and in production against Temporal Cloud or a self-hosted cluster. | ||
|
|
||
| ### Advanced: combine with your own workflows and activities | ||
|
|
||
| The bundled `runGenkitFlow` workflow is enough for the common case. If you want | ||
| to mix Genkit flows with your own existing Temporal workflows and activities, | ||
| the plugin lets you bring your own: | ||
|
|
||
| ```typescript | ||
| await startTemporalWorker({ | ||
| taskQueue: 'my-queue', | ||
| workflowsPath: require.resolve('./my-workflows'), | ||
| activities: { ...require('./my-activities') }, | ||
| }); | ||
| ``` | ||
|
|
||
| ```typescript | ||
| // my-activities.ts | ||
| export { runGenkitFlowActivity } from 'genkitx-temporal/activities'; | ||
| export async function myOtherActivity(/* ... */) { | ||
| /* ... */ | ||
| } | ||
| ``` | ||
|
|
||
| Re-exporting `runGenkitFlowActivity` keeps the built-in workflow working, so | ||
| you can compose Genkit flows alongside hand-written activities for the parts of | ||
| your system that aren't AI. | ||
|
|
||
| ## API summary | ||
|
|
||
| The full public surface is intentionally tiny: | ||
|
|
||
| - `temporal(options?)` — the Genkit plugin. | ||
| - `defineTemporalFlow(ai, config, fn)` — define a flow and register it for | ||
| Temporal execution. | ||
| - `startTemporalWorker(options?)` — start a Worker process. | ||
| - `executeTemporalFlow(flow, input, options?)` — run a flow inside a Workflow | ||
| and await the result. | ||
| - `startTemporalFlow(flow, input, options?)` — same, but returns the | ||
| `WorkflowHandle` for fire-and-forget / signalling. | ||
| - `runGenkitFlowActivity` — the underlying activity, re-exported so you can | ||
| combine it with your own activities. | ||
| - `registerTemporalFlow(name, flow)` — manually register a flow that was | ||
| defined elsewhere (useful when wrapping flows you don't own). | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the general rules, links to plugin documentation should use the official Genkit website URL (e.g.,
genkit.dev/plugins/...) instead of third-party or personal URLs like GitHub.References