Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

## [Unreleased]

### Added

- **Patterns docs section** at `docs/patterns/`, sibling to Concepts. Seeded with four recipes drawn from downstream usage and proposal 0008's alternatives section: parameterized entry point, tool-dispatch-as-node, session-as-checkpoint-resume, and bypass-if-output-exists. Patterns are user-level how-to recipes composing existing primitives, not framework contracts; new patterns can be added without spec coordination. Each page follows a problem / approach / snippet / when this is the right pattern / when it isn't / cross-references structure.

### Notes

- **Pinned spec version bumped to v0.17.1.** Proposal 0019 (multi-provider wire-format extension) reframes llm-provider §8 as a catalog of wire-format mappings, with the existing OpenAI-compatible body nested under §8.1. Purely textual on the spec side — no behavioral change, no fixture changes. Code and doc references to §8.X updated to match the new structure (§8.1 → §8.1.1, §8.2 → §8.1.2, §8.3 → §8.1.3, §8.5.1 → §8.1.5.1, §8.1.1 → §8.1.1.1). All existing conformance fixtures continue to pass.
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Register at build time via `with_checkpointer`:
```python
from openarmature.checkpoint import SQLiteCheckpointer

checkpointer = SQLiteCheckpointer(db_path="./checkpoints.db")
checkpointer = SQLiteCheckpointer(path="./checkpoints.db")

graph = (
GraphBuilder(MyState)
Expand Down
110 changes: 110 additions & 0 deletions docs/patterns/bypass-if-output-exists.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Bypass-if-output-exists

**Problem.** How do I skip a node whose external output already
exists?

## Approach

A small custom [middleware](../concepts/middleware.md) wraps the
node. Before calling `next_(state)`, the middleware checks "does
my output already exist?" (a filesystem file, a database row, a
content-addressable store entry). If yes, it returns the cached
output as the partial update directly. If no, it calls `next_`
and returns the result.

The node sees its normal `(state) → partial_update` contract.
The middleware is the only thing that knows about idempotency;
all callers of the node compose with it cleanly.

## Snippet

```python
import os
from collections.abc import Mapping
from typing import Any
from openarmature.graph import GraphBuilder, NextCall, State


class BypassIfRendered:
"""Skip the node if its rendered output already exists on disk."""

def __init__(self, output_field: str, key_field: str, root: str):
self.output_field = output_field
self.key_field = key_field
self.root = root

async def __call__(
self, state: Any, next_: NextCall
) -> Mapping[str, Any]:
key = getattr(state, self.key_field)
path = f"{self.root}/{key}.bin"
if os.path.exists(path):
with open(path, "rb") as f:
return {self.output_field: f.read()}
partial = await next_(state)
# ... persist partial[self.output_field] to path here, or
# have the node itself write the file ...
return partial


class RenderState(State):
scene_id: str
rendered_frame: bytes = b""


builder = (
GraphBuilder(RenderState)
.add_node(
"render",
render_frame_fn,
middleware=[
BypassIfRendered(
output_field="rendered_frame",
key_field="scene_id",
root="./renders",
)
],
)
# ... rest of graph ...
)
```

The middleware composes with the framework's
[four registration sites](../concepts/middleware.md): attach it
per-node (as above), per-graph, per-branch, or
per-fan-out-instance, depending on the scope of the bypass.

## When this is the right pattern

- The node's work is expensive and idempotent given the same key
(rendering a frame, calling an external API with content-
addressable output, downloading a file).
- The "does it exist" check is cheap (a filesystem `stat`, a
Redis `EXISTS`, a database key lookup).
- You're OK with the node being skipped silently — the partial
update returned by the middleware is indistinguishable from a
successful node run.

## When it isn't

- The check itself is expensive enough that you'd rather just run
the node. The cost model inverts; the pattern is wrong.
- You need to *force* re-execution on demand (cache invalidation).
Add a `force_rerun: bool` field on state that the middleware
consults — but if you're doing that often, the bypass logic
belongs in the node itself, gated on a state field, not in
middleware.
- The cached output's freshness depends on inputs the middleware
can't see (downstream state, time-of-day, etc.). Use a
dedicated caching layer instead of reimplementing cache
invalidation in the middleware.

## Cross-references

- [Middleware](../concepts/middleware.md) — middleware shape, the
four registration sites, composition.
- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/)

This pattern is explicitly called out in proposal 0008's
*Alternatives considered* section as a userland recipe rather than
spec'd behavior — this page is its canonical home.
35 changes: 35 additions & 0 deletions docs/patterns/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Patterns

Recipes for things people keep asking the framework to do but
that compose cleanly from existing primitives.

The split between [Concepts](../concepts/index.md) and Patterns is
intentional: Concepts explain *what OpenArmature is* — typed state,
nodes, edges, middleware, checkpointing, observers. Patterns
explain *ways to use it* — opinionated shapes for common
downstream questions like "how do I run an agent loop?" or "how do
I skip work that's already been done?".

## When to read which

- You don't know what a `State` is, or how nodes and edges fit
together → start with [Concepts](../concepts/index.md).
- You know the primitives but you're asking "how do I do X with
them?" → look here.

Patterns are user-level recipes, not framework contracts. New
patterns can be added without spec coordination — they're how-to
docs composing existing primitives.

## The catalog

- [Parameterized entry point](parameterized-entry-point.md) —
start the graph at an arbitrary node via state-driven routing.
- [Tool-dispatch-as-node](tool-dispatch-as-node.md) — model an
agent tool-call loop as a graph cycle.
- [Session-as-checkpoint-resume](session-as-checkpoint-resume.md) —
carry multi-turn agent state across turns using the existing
checkpointer.
- [Bypass-if-output-exists](bypass-if-output-exists.md) —
short-circuit a node whose external output already exists, via
middleware.
100 changes: 100 additions & 0 deletions docs/patterns/parameterized-entry-point.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Parameterized entry point

**Problem.** How do I start the graph at an arbitrary node?

## Approach

You don't. Make the "entry point" a state-level parameter instead.
A first router node passes through, and a
[conditional edge](../concepts/composition.md) routes to wherever
execution should begin. The graph stays a single graph; what
differs across runs is which branch the conditional edge takes.

Combine with [checkpointing](../concepts/checkpointing.md) if you
want resume-style behavior — skip nodes whose work is already
captured in state.

## Snippet

```python
from openarmature.graph import END, EndSentinel, GraphBuilder, State


class MissionState(State):
starting_stage: str = "plan" # "plan" | "execute" | "report"
plan: str = ""
execution_log: str = ""
report: str = ""


def route_from_starting_stage(s: MissionState) -> str | EndSentinel:
return s.starting_stage


async def router(s: MissionState) -> dict:
return {} # no state change; conditional edge below routes


async def plan(s: MissionState) -> dict:
return {
"plan": "Apollo-style free-return trajectory.",
"starting_stage": "execute",
}


async def execute(s: MissionState) -> dict:
return {"execution_log": "Burn complete. Trajectory nominal."}


async def report(s: MissionState) -> dict:
return {"report": "Mission objectives met."}


builder = (
GraphBuilder(MissionState)
.add_node("router", router)
.add_node("plan", plan)
.add_node("execute", execute)
.add_node("report", report)
.add_conditional_edge("router", route_from_starting_stage)
.add_edge("plan", "execute")
.add_edge("execute", "report")
.add_edge("report", END)
.set_entry("router")
)
graph = builder.compile()

# Start at the beginning:
await graph.invoke(MissionState())

# Or skip straight to execute, with the plan already in state:
await graph.invoke(MissionState(starting_stage="execute", plan="..."))
```

The caller pre-populates `starting_stage` (and any prerequisite
fields the chosen branch needs) and the graph routes accordingly.

## When this is the right pattern

- You have a few canonical entry points and the choice between
them is data, not control flow.
- You want to skip work already done in a prior run — combine with
[checkpointing](../concepts/checkpointing.md) to pick up where
you left off.
- Your "different entry points" share state structure and most of
the downstream graph.

## When it isn't

- "Start at node X" really means "run a different pipeline." Then
it's a different compiled graph. Don't bend one graph into two;
two graphs are easier to test and reason about.
- The number of entry points grows unboundedly. Then you're
reimplementing routing — consider a higher-level dispatch layer
that picks which graph to invoke.

## Cross-references

- [Composition: conditional edges](../concepts/composition.md)
- [Checkpointing](../concepts/checkpointing.md)
- Spec: [graph-engine](https://openarmature.org/capabilities/graph-engine/)
113 changes: 113 additions & 0 deletions docs/patterns/session-as-checkpoint-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Session-as-checkpoint-resume

**Problem.** How do I keep multi-turn agent state across turns?

## Approach

The framework's [checkpointing](../concepts/checkpointing.md)
provides single-invocation crash resume out of the box. Multi-turn
state is the same primitive used differently: the application
keeps a stable `session_id → invocation_id` mapping, and each
turn calls `invoke(resume_invocation=<prior_invocation_id>)` to
pick up where the previous turn left off.

The checkpointer returns the prior state. The new turn proceeds
from there. Session-context fields that accumulate across turns
(message history, retrieved facts, running totals) use a `merge`
or `append` reducer so each turn's contribution adds to what's
already there rather than replacing it.

Each resume mints a new `invocation_id`; the `session_id` is the
join key the application maintains, typically as the
`correlation_id` on `invoke()` (which is preserved unchanged
across resume).

## Snippet

```python
from typing import Annotated
from openarmature.checkpoint import SQLiteCheckpointer
from openarmature.graph import END, GraphBuilder, State, append, merge
from openarmature.llm import Message


class SessionState(State):
messages: Annotated[list[Message], append] = []
facts: Annotated[dict[str, str], merge] = {}
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated
last_user_input: str = ""


# ... define nodes that read s.messages, append to s.messages,
# and merge into s.facts ...

checkpointer = SQLiteCheckpointer(path="./sessions.db")
graph = (
GraphBuilder(SessionState)
.add_node("plan", plan)
.add_node("respond", respond)
.add_edge("plan", "respond")
.add_edge("respond", END)
.set_entry("plan")
.with_checkpointer(checkpointer)
.compile()
)


# The application maintains its own session table mapping
# session_id -> latest invocation_id. OA's checkpointer doesn't
# know about sessions; the join is the application's
# responsibility. The session_id doubles as correlation_id so
# observability traces share the cross-turn join key.
async def handle_turn(session_id: str, user_input: str) -> str:
initial = SessionState(last_user_input=user_input)
prior_invocation_id = sessions_db.get_invocation_id(session_id)

if prior_invocation_id is None:
final = await graph.invoke(initial, correlation_id=session_id)
else:
final = await graph.invoke(
initial, resume_invocation=prior_invocation_id
)

# Record the new invocation_id for next turn's resume.
# Read it from the checkpointer's latest record for this
# correlation_id; exact lookup is application-side bookkeeping.
sessions_db.set_invocation_id(session_id, latest_for(session_id))

return final.messages[-1].content
```

`sessions_db` is your application's session-state store (Postgres,
Redis, a flat file, whatever); the checkpointer holds the OA-side
state and the session table holds the join keys.

## When this is the right pattern

- Your application has long-lived sessions with multiple LLM turns
and you want the prior state to be the starting point of the
next turn.
- You're already running a checkpointer for crash resume — this
pattern is "use it more."
- Cross-turn state has clean reducer semantics: `merge` for
accumulating dicts, `append` for growing lists.

## When it isn't

- A session's "state" is bigger than fits comfortably in a single
graph state shape. Split into multiple graphs and share an
external store keyed by session.
- Turns are completely independent — there's no value in carrying
state across them. Then just run each turn as a fresh invoke.
- The application already has its own state-management layer that
conflicts with OA's frozen-state model. Use OA per-turn without
cross-turn resume.

## Cross-references

- [Checkpointing](../concepts/checkpointing.md) — backend wiring,
`resume_invocation`, schema migration.
- [State and reducers](../concepts/state-and-reducers.md) — `merge`
and `append` reducer strategies.
- [`examples/08-checkpointing-and-migration`](../examples/08-checkpointing-and-migration.md) —
single-resume baseline.
- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/)
Loading