Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Added

- **Parallel branches (proposal 0011, introduced in spec v0.11.0; attempt-index propagation clarified in spec v0.16.1).** New `GraphBuilder.add_parallel_branches_node(name, *, branches, error_policy, errors_field, middleware)` surface dispatches M heterogeneous compiled subgraphs concurrently per pipeline-utilities §11. `BranchSpec` (subgraph + inputs/outputs projection + branch middleware) and `ParallelBranchesNode` types exported from `openarmature.graph`. Branch insertion order determines fan-in merge order regardless of completion timing (§11.8). Two error policies: `"fail_fast"` raises `ParallelBranchesBranchFailed` (a `NodeException` subtype) with `branch_name`, original cause as `__cause__`, and `recoverable_state` carrying the parent's pre-dispatch snapshot — no buffered branch contributions are visible (§11.5 buffer-and-apply). `"collect"` records per-branch failures in an optional `errors_field` (each record carries `branch_name` + `category` + implementation-defined extras) and continues. Two new error categories: `ParallelBranchesNoBranches` (compile time, empty branches map) and `ParallelBranchesBranchFailed` (runtime, fail_fast branch raise).
- **`NodeEvent.branch_name: str | None` (proposal 0011 / graph-engine §6).** Populated on events from nodes inside a parallel-branches branch, absent outside. Independent of `fan_out_index` — both may be present simultaneously when a branch contains a fan-out (or a fan-out instance contains a parallel-branches node). The combined `(namespace, branch_name, fan_out_index, attempt_index, phase)` tuple is the event-source uniqueness key.
- **`openarmature.branch_name` OTel span attribute.** Mirrors the existing `openarmature.node.fan_out_index`. Emitted on synthesized inner-node spans when `branch_name` is populated on the event. The two attributes coexist on inner nodes of a fan-out-inside-a-branch composition.
- **Attempt-index ContextVar propagation through transitive retry (graph-engine §6 v0.16.1).** Retry middleware now sets the `attempt_index` ContextVar before each `next` call; the engine reads `current_attempt_index()` when emitting events. This makes retry semantics symmetric across direct (per-node middleware) and transitive (instance / branch / fan-out instance_middleware) wrapping — events from inner nodes of a subgraph the retry re-invokes carry the wrapping retry's counter, not a freshly-zeroed inner counter. Innermost-wins precedence falls out of Python's ContextVar set/reset token stack. Pre-existing node-level retry behavior is unchanged.
- **State migration for checkpointed graphs (proposal 0014, introduced in spec v0.15.0; refined by proposal 0018 in spec v0.16.0).** Saved checkpoints whose `schema_version` doesn't match the current state class now route through a registered migration chain instead of failing on resume. Surface: `State.schema_version: ClassVar[str] = ""` (declare a non-empty value to opt in), `GraphBuilder.with_state_migration(from_version, to_version, migrate)` and `with_state_migrations(*migrations)` for registration, `StateMigration` and `MigrationRegistry` types exported from `openarmature.checkpoint`. Chain resolution is BFS over the registered edges; the shortest path wins. Three new error categories: `CheckpointStateMigrationChainAmbiguous` (proposal 0018: duplicate `(from, to)` pair at registration time, or multiple distinct shortest paths between the saved and current versions at resume time), `CheckpointStateMigrationMissing` (no chain bridges the versions), and `CheckpointStateMigrationFailed` (a migration function raised). All non-transient. Post-migration deserialization failures still route to `CheckpointRecordInvalid` per §10.12.4. The same chain applies to each entry in `parent_states` in lockstep with the outer state per §10.12.2. Routing precedence per §10.10 (v0.16.0): chain-ambiguous → missing → failed → record-invalid.
- **`Checkpointer.supports_state_migration` Protocol attribute.** Marks whether a backend can expose the structural intermediate form (a plain dict, JSON tree) the migration registry consumes. `SQLiteCheckpointer(serialization="json")` opts in; `SQLiteCheckpointer(serialization="pickle")` and `InMemoryCheckpointer` opt out. On version mismatch against a non-migration-eligible backend the engine raises `CheckpointRecordInvalid` per spec §10.12.1.
- **`openarmature.checkpoint.migrate` OTel span (proposal 0014 §6 cross-ref).** Versioned resumes whose migration chain runs emit a zero-duration `openarmature.checkpoint.migrate` span on the OTel observer, parented under the invocation root span. Attributes: `openarmature.checkpoint.migrate.from_version`, `openarmature.checkpoint.migrate.to_version` (the final target), `openarmature.checkpoint.migrate.chain_length`. The §10.12.3 fast path (versions match, registry not consulted) emits no span. Engine-side: a synthetic `checkpoint_migrated` observer phase carries a `_MigrationSummary` payload from `_migrate_record` through to the OTel observer; the new phase is gated off default subscriptions (observers opt in explicitly via `phases={..., "checkpoint_migrated"}`).
Expand All @@ -25,13 +29,13 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Changed

- **Pinned spec version: 0.10.0 → 0.16.0.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.0 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. Only the surfaces introduced by proposals 0014–0017 are implemented in the batch's release; fixtures from 0011 are deferred-skip in the conformance suite and unmark with PR-5.
- **Pinned spec version: 0.10.0 → 0.16.1.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.1 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. All five proposals (0011, 0014, 0015, 0016, 0017) are implemented in the batch's release; the v0.16.1 clarification of attempt-index propagation through transitive retry middleware lands with the proposal 0011 implementation.
- **`CheckpointRecord.schema_version` semantic shift (proposal 0014).** Previously a backend-internal record-shape version (`CHECKPOINT_SCHEMA_VERSION = "1"` constant), now the user-facing state-schema version per spec §10.2. The framework reads `type(state).schema_version` at save time. Pre-PR-4 records carrying `"1"` are reinterpreted as user-facing v1 identifiers; users with such records either declare `schema_version="1"` on their state class or discard the pre-PR-4 records. `SQLiteCheckpointer` no longer rejects records with non-default `schema_version` at the backend boundary; version-mismatch routing is now an engine concern at resume time. The `CHECKPOINT_SCHEMA_VERSION` module constant is removed; future record-shape evolution can add backend-private metadata fields if needed.
- **`NodeEvent.pre_state` typed `Any` (was `State`).** Required by the new `checkpoint_migrated` phase which carries a `_MigrationSummary` payload rather than a `State` instance. Observer authors who type-narrowed `pre_state` to `State` should treat it as `Any` and narrow per-phase (e.g., `if event.phase == "completed": ...`). The `checkpoint_saved` phase already carried a State-flavored shape (not necessarily a typed `State` subclass instance), so this widens the declared type to match runtime reality rather than introducing a new constraint.

### Notes

- **Release gate: do not tag until all of {0011, 0014, 0015, 0016, 0017} are merged.** This batch implements one proposal per PR and lands a consolidated release after the fifth PR. Cutting a release tag before the batch is complete would ship a partial spec implementation against the v0.15.0 pin.
- **Release gate cleared with PR-5 (proposal 0011).** All five proposals in the batch ({0011, 0014, 0015, 0016, 0017}) are now implemented. Tag the consolidated release once this PR merges.
- **Pre-1.0 MINOR.** Existing free-form callers (no `response_schema`) see no behavior change — the new field defaults to `None`, the wire body omits `response_format`, and `Response.parsed` remains absent.

## [0.5.0] — 2026-05-10
Expand Down
3 changes: 3 additions & 0 deletions docs/concepts/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ the framework, or jump to whichever concept you need.
data seam.
- [Fan-out](fan-out.md): running the same subgraph many times in
parallel, results merged back deterministically.
- [Parallel branches](parallel-branches.md): dispatching M
heterogeneous subgraphs concurrently with per-branch state schemas
and middleware.
- [LLMs](llms.md): how LLM calls fit into nodes, structured output,
routing on parsed fields, errors at the LLM boundary.
- [Observability](observability.md): node-boundary hooks, OTel mapping,
Expand Down
18 changes: 17 additions & 1 deletion docs/concepts/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class NodeEvent:
attempt_index: int = 0
fan_out_index: int | None = None
fan_out_config: FanOutEventConfig | None = None
branch_name: str | None = None
```

A walk-through:
Expand Down Expand Up @@ -130,7 +131,11 @@ A walk-through:
`len(parent_states) == len(namespace) - 1`.

- **`attempt_index`**: 0-based retry attempt counter. `0` for nodes
not wrapped by retry middleware; `1+` for retries.
not wrapped by retry middleware; `1+` for retries. Retry middleware
may wrap transitively — a retry on a [parallel-branches
branch](parallel-branches.md) or fan-out `instance_middleware`
re-runs the whole subgraph; events from inner nodes carry the
wrapping retry's attempt counter.

- **`fan_out_index`**: 0-based per-instance index for events inside
a fan-out instance; `None` outside.
Expand All @@ -140,6 +145,17 @@ A walk-through:
`item_count` / `concurrency` / `error_policy` / `parent_node_name`.
`None` on every other event.

- **`branch_name`**: populated on events from nodes inside a
[parallel-branches branch](parallel-branches.md), carrying the
branch's name as declared on the dispatcher. `None` outside.
Independent of `fan_out_index` — both may be present simultaneously
when a parallel-branches branch contains a fan-out (or a fan-out
instance contains a parallel-branches node). The combination
`(namespace, branch_name, fan_out_index, attempt_index, phase)`
uniquely identifies each event source. On the OTel mapping
side, an `openarmature.branch_name` span attribute is added in
parallel to the existing `openarmature.node.fan_out_index`.

## Routing errors and the completed event

When a conditional edge raises or returns an invalid target:
Expand Down
151 changes: 151 additions & 0 deletions docs/concepts/parallel-branches.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Parallel branches

Dispatch M heterogeneous subgraphs concurrently, projected outputs
merged back into the parent via the parent's reducers in branch
insertion order.

Sibling to [fan-out](fan-out.md) (same `for each thing, do work in
parallel` shape), but the *thing* is different per branch: a research
subgraph, a categorize subgraph, a sentiment subgraph — each with its
own state schema, its own middleware, its own observer events —
running in parallel and joining their results into one parent state.

## When to reach for parallel branches

The signal: a fixed set of named operations, each with its own
behavior and state schema, that don't depend on each other. Three
classifiers running independently against the same input. A research
step, a translate step, and a fact-check step that all want the
parent's prompt. M is known at build time and small (typically 2–6),
and each branch is its own subgraph because each has its own
internal pipeline worth modelling separately.

Fan-out is the right pick when you have N similar pieces of work,
N depends on runtime state, and the work is the same across instances.
Parallel branches is the right pick when M is a small fixed set of
different operations that happen to run concurrently.

## The shape

```python
from openarmature.graph import BranchSpec, GraphBuilder

builder.add_parallel_branches_node(
"dispatcher",
branches={
"research": BranchSpec(
subgraph=research_subgraph, # CompiledGraph[ResearchState]
inputs={"question": "prompt"}, # subgraph_field -> parent_field
outputs={"facts": "facts"}, # parent_field -> subgraph_field
),
"translate": BranchSpec(
subgraph=translate_subgraph, # CompiledGraph[TranslateState]
inputs={"source": "prompt"},
outputs={"translation": "translated"},
),
"fact_check": BranchSpec(
subgraph=fact_check_subgraph, # CompiledGraph[FactCheckState]
inputs={"claim": "prompt"},
outputs={"verdict": "verdict"},
),
},
error_policy="fail_fast", # or "collect"
)
```

Each branch's `subgraph` is a compiled graph; `inputs` and `outputs`
mirror the explicit projection shape from
[composition](composition.md#explicitmapping-declarative). The
branches dict's key is the branch name — used as the branch identity
on observer events (see [observability](observability.md)) and in
the per-branch error records that `error_policy: "collect"`
produces.

## Per-branch state, inputs and outputs

Each branch runs its own subgraph against its own state — heterogeneous
schemas are explicit. Subgraph fields named in `inputs` are seeded
from the parent's corresponding field at branch entry; other subgraph
fields take their schema defaults. At branch exit, only the parent
fields named in `outputs` receive contributions; the rest of the
branch's final state is discarded.

When two branches contribute to the same parent field, the parent's
reducer for that field applies both values in **branch insertion
order** — first the branch declared first in the `branches` dict,
then the next, and so on. This is deterministic regardless of which
branch's inner work finishes first.

## Error policy

- **`"fail_fast"`** (default): the first branch failure cancels
the in-flight siblings and propagates as
`ParallelBranchesBranchFailed` (a `NodeException` subtype) carrying
the failing `branch_name` and the original cause as `__cause__`.
`recoverable_state` is the parent's snapshot at the moment the
dispatcher entered — **no buffered branch contributions are
applied**, including those of branches that successfully completed
before the failure. Buffer-and-apply semantics: contributions are
held until every branch finishes, then either all apply (success)
or none apply (fail_fast failure).
- **`"collect"`**: every branch runs to completion. Successful
branches' contributions merge in insertion order; failed branches'
`outputs` projections do NOT fire (their named parent fields stay
at their defaults). If you declare `errors_field` on the dispatcher,
each failed branch produces a record with at minimum
`{"branch_name": <name>, "category": <category>}` appended to that
parent list field; the implementation may include additional keys
(message, cause_type) and tests should match by the spec-mandated
keys rather than strict equality.

## Branch middleware

Each `BranchSpec` accepts a `middleware` tuple — middlewares that
wrap that branch's whole subgraph invocation as a unit. Retry
middleware on a branch retries the **whole branch**: a fresh
subgraph invocation each time, fresh inner-node execution. The
wrapping retry's attempt counter propagates to events emitted from
inner nodes (per graph-engine §6 v0.16.1), so observer events
inside the branch correctly show `attempt_index` ticking across
retries.

Branch middleware is independent across branches — branch A may
have `[retry, timing]`; branch B may have `[]`; branch C may have
some custom breaker. Each branch's chain composes in isolation.

## Composition with other constructs

Parallel branches compose with the rest of the engine the way
subgraphs and fan-outs do:

- A branch's subgraph can itself contain a fan-out node — inner-node
events inside that fan-out carry **both** `branch_name` (this
branch) and `fan_out_index` (the instance within this branch).
The two fields are independent.
- The parallel-branches node itself can be invoked from inside a
fan-out instance — inner events then carry the outer fan-out's
`fan_out_index` and the inner branch's `branch_name`.
- Per-graph and per-node middleware on the parallel-branches node
wrap the dispatcher as a single unit — one `started` event before
dispatch begins, one `completed` event after all branches finish
and fan-in lands. The parent's retry middleware retries the **whole
parallel-branches node**, not individual branches.

## Resume semantics

Parallel-branches nodes use the same **atomic restart** model as
fan-out (per spec §10.7): if a checkpoint resume lands on a
parallel-branches node, all branches re-dispatch from scratch.
Per-branch progress is not individually persisted in v1.

## When parallel branches is NOT the right shape

- **Not the same as N copies of one subgraph.** If you want "run
this subgraph for each item in a list," reach for
[fan-out](fan-out.md).
- **Not a router.** A router is a conditional-edge pattern — pick
one branch based on state. Parallel branches runs *all* branches
concurrently.
- **Not a coordinator.** Branches don't communicate with each other
during execution; if branch B's work depends on branch A's
output, you want a linear pipeline (A → B), not parallel branches.
2 changes: 1 addition & 1 deletion openarmature-spec
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
Specification = "https://github.com/LunarCommand/openarmature-spec"

[tool.openarmature]
spec_version = "0.16.0"
spec_version = "0.16.1"

[dependency-groups]
dev = [
Expand Down
2 changes: 1 addition & 1 deletion src/openarmature/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""OpenArmature — workflow framework for LLM pipelines and tool-calling agents."""

__version__ = "0.5.0"
__spec_version__ = "0.16.0"
__spec_version__ = "0.16.1"
7 changes: 7 additions & 0 deletions src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
MultipleOutgoingEdges,
NoDeclaredEntry,
NodeException,
ParallelBranchesBranchFailed,
ParallelBranchesNoBranches,
ReducerError,
RoutingError,
RuntimeGraphError,
Expand All @@ -45,6 +47,7 @@
)
from .nodes import FunctionNode, Node
from .observer import Observer, RemoveHandle, SubscribedObserver
from .parallel_branches import BranchSpec, ParallelBranchesNode
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
from .reducers import Reducer, append, last_write_wins, merge
from .state import State
Expand Down Expand Up @@ -79,6 +82,10 @@
"NodeException",
"NoDeclaredEntry",
"Observer",
"ParallelBranchesBranchFailed",
"ParallelBranchesNoBranches",
"ParallelBranchesNode",
"BranchSpec",
"ProjectionStrategy",
"Reducer",
"ReducerError",
Expand Down
Loading