Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

## [Unreleased]

### Added

- **Patterns docs section** at `docs/patterns/`, sibling to Concepts. Seeded with four recipes drawn from downstream usage and proposal 0008's alternatives section: parameterized entry point, tool-dispatch-as-node, session-as-checkpoint-resume, and bypass-if-output-exists. Patterns are user-level how-to recipes composing existing primitives, not framework contracts; new patterns can be added without spec coordination. Each page follows a problem / approach / snippet / when this is the right pattern / when it isn't / cross-references structure.

### Notes

- **Pinned spec version bumped to v0.17.1.** Proposal 0019 (multi-provider wire-format extension) reframes llm-provider §8 as a catalog of wire-format mappings, with the existing OpenAI-compatible body nested under §8.1. Purely textual on the spec side — no behavioral change, no fixture changes. Code and doc references to §8.X updated to match the new structure (§8.1 → §8.1.1, §8.2 → §8.1.2, §8.3 → §8.1.3, §8.5.1 → §8.1.5.1, §8.1.1 → §8.1.1.1). All existing conformance fixtures continue to pass.
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Register at build time via `with_checkpointer`:
```python
from openarmature.checkpoint import SQLiteCheckpointer

checkpointer = SQLiteCheckpointer(db_path="./checkpoints.db")
checkpointer = SQLiteCheckpointer(path="./checkpoints.db")

graph = (
GraphBuilder(MyState)
Expand Down
110 changes: 110 additions & 0 deletions docs/patterns/bypass-if-output-exists.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Bypass-if-output-exists

**Problem.** How do I skip a node whose external output already
exists?

## Approach

A small custom [middleware](../concepts/middleware.md) wraps the
node. Before calling `next_(state)`, the middleware checks "does
my output already exist?" (a filesystem file, a database row, a
content-addressable store entry). If yes, it returns the cached
output as the partial update directly. If no, it calls `next_`
and returns the result.

The node sees its normal `(state) → partial_update` contract.
The middleware is the only thing that knows about idempotency;
all callers of the node compose with it cleanly.

## Snippet

```python
import os
from collections.abc import Mapping
from typing import Any
from openarmature.graph import GraphBuilder, NextCall, State


class BypassIfRendered:
"""Skip the node if its rendered output already exists on disk."""

def __init__(self, output_field: str, key_field: str, root: str):
self.output_field = output_field
self.key_field = key_field
self.root = root

async def __call__(
self, state: Any, next_: NextCall
) -> Mapping[str, Any]:
key = getattr(state, self.key_field)
path = f"{self.root}/{key}.bin"
if os.path.exists(path):
with open(path, "rb") as f:
return {self.output_field: f.read()}
partial = await next_(state)
# ... persist partial[self.output_field] to path here, or
# have the node itself write the file ...
return partial


class RenderState(State):
scene_id: str
rendered_frame: bytes = b""


builder = (
GraphBuilder(RenderState)
.add_node(
"render",
render_frame_fn,
middleware=[
BypassIfRendered(
output_field="rendered_frame",
key_field="scene_id",
root="./renders",
)
],
)
# ... rest of graph ...
)
```

The middleware composes with the framework's
[four registration sites](../concepts/middleware.md): attach it
per-node (as above), per-graph, per-branch, or
per-fan-out-instance, depending on the scope of the bypass.

## When this is the right pattern

- The node's work is expensive and idempotent given the same key
(rendering a frame, calling an external API with content-
addressable output, downloading a file).
- The "does it exist" check is cheap (a filesystem `stat`, a
Redis `EXISTS`, a database key lookup).
- You're OK with the node being skipped silently — the partial
update returned by the middleware is indistinguishable from a
successful node run.

## When it isn't

- The check itself is expensive enough that you'd rather just run
the node. The cost model inverts; the pattern is wrong.
- You need to *force* re-execution on demand (cache invalidation).
Add a `force_rerun: bool` field on state that the middleware
consults — but if you're doing that often, the bypass logic
belongs in the node itself, gated on a state field, not in
middleware.
- The cached output's freshness depends on inputs the middleware
can't see (downstream state, time-of-day, etc.). Use a
dedicated caching layer instead of reimplementing cache
invalidation in the middleware.

## Cross-references

- [Middleware](../concepts/middleware.md) — middleware shape, the
four registration sites, composition.
- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/)

This pattern is explicitly called out in proposal 0008's
*Alternatives considered* section as a userland recipe rather than
spec'd behavior — this page is its canonical home.
35 changes: 35 additions & 0 deletions docs/patterns/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Patterns

Recipes for things people keep asking the framework to do but
that compose cleanly from existing primitives.

The split between [Concepts](../concepts/index.md) and Patterns is
intentional: Concepts explain *what OpenArmature is* — typed state,
nodes, edges, middleware, checkpointing, observers. Patterns
explain *ways to use it* — opinionated shapes for common
downstream questions like "how do I run an agent loop?" or "how do
I skip work that's already been done?".

## When to read which

- You don't know what a `State` is, or how nodes and edges fit
together → start with [Concepts](../concepts/index.md).
- You know the primitives but you're asking "how do I do X with
them?" → look here.

Patterns are user-level recipes, not framework contracts. New
patterns can be added without spec coordination — they're how-to
docs composing existing primitives.

## The catalog

- [Parameterized entry point](parameterized-entry-point.md) —
start the graph at an arbitrary node via state-driven routing.
- [Tool-dispatch-as-node](tool-dispatch-as-node.md) — model an
agent tool-call loop as a graph cycle.
- [Session-as-checkpoint-resume](session-as-checkpoint-resume.md) —
carry multi-turn agent state across turns using the existing
checkpointer.
- [Bypass-if-output-exists](bypass-if-output-exists.md) —
short-circuit a node whose external output already exists, via
middleware.
100 changes: 100 additions & 0 deletions docs/patterns/parameterized-entry-point.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Parameterized entry point

**Problem.** How do I start the graph at an arbitrary node?

## Approach

You don't. Make the "entry point" a state-level parameter instead.
A first router node passes through, and a
[conditional edge](../concepts/composition.md) routes to wherever
execution should begin. The graph stays a single graph; what
differs across runs is which branch the conditional edge takes.

Combine with [checkpointing](../concepts/checkpointing.md) if you
want resume-style behavior — skip nodes whose work is already
captured in state.

## Snippet

```python
from openarmature.graph import END, EndSentinel, GraphBuilder, State


class MissionState(State):
starting_stage: str = "plan" # "plan" | "execute" | "report"
plan: str = ""
execution_log: str = ""
report: str = ""


def route_from_starting_stage(s: MissionState) -> str | EndSentinel:
return s.starting_stage


async def router(s: MissionState) -> dict:
return {} # no state change; conditional edge below routes


async def plan(s: MissionState) -> dict:
return {
"plan": "Apollo-style free-return trajectory.",
"starting_stage": "execute",
}


async def execute(s: MissionState) -> dict:
return {"execution_log": "Burn complete. Trajectory nominal."}


async def report(s: MissionState) -> dict:
return {"report": "Mission objectives met."}


builder = (
GraphBuilder(MissionState)
.add_node("router", router)
.add_node("plan", plan)
.add_node("execute", execute)
.add_node("report", report)
.add_conditional_edge("router", route_from_starting_stage)
.add_edge("plan", "execute")
.add_edge("execute", "report")
.add_edge("report", END)
.set_entry("router")
)
graph = builder.compile()

# Start at the beginning:
await graph.invoke(MissionState())

# Or skip straight to execute, with the plan already in state:
await graph.invoke(MissionState(starting_stage="execute", plan="..."))
```

The caller pre-populates `starting_stage` (and any prerequisite
fields the chosen branch needs) and the graph routes accordingly.

## When this is the right pattern

- You have a few canonical entry points and the choice between
them is data, not control flow.
- You want to skip work already done in a prior run — combine with
[checkpointing](../concepts/checkpointing.md) to pick up where
you left off.
- Your "different entry points" share state structure and most of
the downstream graph.

## When it isn't

- "Start at node X" really means "run a different pipeline." Then
it's a different compiled graph. Don't bend one graph into two;
two graphs are easier to test and reason about.
- The number of entry points grows unboundedly. Then you're
reimplementing routing — consider a higher-level dispatch layer
that picks which graph to invoke.

## Cross-references

- [Composition: conditional edges](../concepts/composition.md)
- [Checkpointing](../concepts/checkpointing.md)
- Spec: [graph-engine](https://openarmature.org/capabilities/graph-engine/)
114 changes: 114 additions & 0 deletions docs/patterns/session-as-checkpoint-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Session-as-checkpoint-resume

**Problem.** How do I keep multi-turn agent state across turns?

## Approach

The framework's [checkpointing](../concepts/checkpointing.md)
provides single-invocation crash resume out of the box. Multi-turn
state is the same primitive used differently: the application
keeps a stable `session_id → invocation_id` mapping, and each
turn calls `invoke(resume_invocation=<prior_invocation_id>)` to
pick up where the previous turn left off.

The checkpointer returns the prior state. The new turn proceeds
from there. Session-context fields that accumulate across turns
(message history, retrieved facts, running totals) use a `merge`
or `append` reducer so each turn's contribution adds to what's
already there rather than replacing it.

Each resume mints a new `invocation_id`; the `session_id` is the
join key the application maintains, typically as the
`correlation_id` on `invoke()` (which is preserved unchanged
across resume).

## Snippet

```python
from typing import Annotated
from pydantic import Field
from openarmature.checkpoint import SQLiteCheckpointer
from openarmature.graph import END, GraphBuilder, State, append, merge
from openarmature.llm import Message


class SessionState(State):
messages: Annotated[list[Message], append] = Field(default_factory=list)
facts: Annotated[dict[str, str], merge] = Field(default_factory=dict)
last_user_input: str = ""


# ... define nodes that read s.messages, append to s.messages,
# and merge into s.facts ...

checkpointer = SQLiteCheckpointer(path="./sessions.db")
graph = (
GraphBuilder(SessionState)
.add_node("plan", plan)
.add_node("respond", respond)
.add_edge("plan", "respond")
.add_edge("respond", END)
.set_entry("plan")
.with_checkpointer(checkpointer)
.compile()
)


# The application maintains its own session table mapping
# session_id -> latest invocation_id. OA's checkpointer doesn't
# know about sessions; the join is the application's
# responsibility. The session_id doubles as correlation_id so
# observability traces share the cross-turn join key.
async def handle_turn(session_id: str, user_input: str) -> str:
initial = SessionState(last_user_input=user_input)
prior_invocation_id = sessions_db.get_invocation_id(session_id)

if prior_invocation_id is None:
final = await graph.invoke(initial, correlation_id=session_id)
else:
final = await graph.invoke(
initial, resume_invocation=prior_invocation_id
)

# Record the new invocation_id for next turn's resume.
# Read it from the checkpointer's latest record for this
# correlation_id; exact lookup is application-side bookkeeping.
sessions_db.set_invocation_id(session_id, latest_for(session_id))

return final.messages[-1].content
```

`sessions_db` is your application's session-state store (Postgres,
Redis, a flat file, whatever); the checkpointer holds the OA-side
state and the session table holds the join keys.

## When this is the right pattern

- Your application has long-lived sessions with multiple LLM turns
and you want the prior state to be the starting point of the
next turn.
- You're already running a checkpointer for crash resume — this
pattern is "use it more."
- Cross-turn state has clean reducer semantics: `merge` for
accumulating dicts, `append` for growing lists.

## When it isn't

- A session's "state" is bigger than fits comfortably in a single
graph state shape. Split into multiple graphs and share an
external store keyed by session.
- Turns are completely independent — there's no value in carrying
state across them. Then just run each turn as a fresh invoke.
- The application already has its own state-management layer that
conflicts with OA's frozen-state model. Use OA per-turn without
cross-turn resume.

## Cross-references

- [Checkpointing](../concepts/checkpointing.md) — backend wiring,
`resume_invocation`, schema migration.
- [State and reducers](../concepts/state-and-reducers.md) — `merge`
and `append` reducer strategies.
- [`examples/08-checkpointing-and-migration`](../examples/08-checkpointing-and-migration.md) —
single-resume baseline.
- Spec: [pipeline-utilities](https://openarmature.org/capabilities/pipeline-utilities/)
Loading