Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Added

- **Parallel branches (proposal 0011, introduced in spec v0.11.0; attempt-index propagation clarified in spec v0.16.1).** New `GraphBuilder.add_parallel_branches_node(name, *, branches, error_policy, errors_field, middleware)` surface dispatches M heterogeneous compiled subgraphs concurrently per pipeline-utilities §11. `BranchSpec` (subgraph + inputs/outputs projection + branch middleware) and `ParallelBranchesNode` types exported from `openarmature.graph`. Branch insertion order determines fan-in merge order regardless of completion timing (§11.8). Two error policies: `"fail_fast"` raises `ParallelBranchesBranchFailed` (a `NodeException` subtype) with `branch_name`, original cause as `__cause__`, and `recoverable_state` carrying the parent's pre-dispatch snapshot — no buffered branch contributions are visible (§11.5 buffer-and-apply). `"collect"` records per-branch failures in an optional `errors_field` (each record carries `branch_name` + `category` + implementation-defined extras) and continues. Two new error categories: `ParallelBranchesNoBranches` (compile time, empty branches map) and `ParallelBranchesBranchFailed` (runtime, fail_fast branch raise).
- **`NodeEvent.branch_name: str | None` (proposal 0011 / graph-engine §6).** Populated on events from nodes inside a parallel-branches branch, absent outside. Independent of `fan_out_index` — both may be present simultaneously when a branch contains a fan-out (or a fan-out instance contains a parallel-branches node). The combined `(namespace, branch_name, fan_out_index, attempt_index, phase)` tuple is the event-source uniqueness key.
- **`openarmature.branch_name` OTel span attribute.** Mirrors the existing `openarmature.node.fan_out_index`. Emitted on synthesized inner-node spans when `branch_name` is populated on the event. The two attributes coexist on inner nodes of a fan-out-inside-a-branch composition.
- **Attempt-index ContextVar propagation through transitive retry (graph-engine §6 v0.16.1).** Retry middleware now sets the `attempt_index` ContextVar before each `next` call; the engine reads `current_attempt_index()` when emitting events. This makes retry semantics symmetric across direct (per-node middleware) and transitive (instance / branch / fan-out instance_middleware) wrapping — events from inner nodes of a subgraph the retry re-invokes carry the wrapping retry's counter, not a freshly-zeroed inner counter. Innermost-wins precedence falls out of Python's ContextVar set/reset token stack. Pre-existing node-level retry behavior is unchanged.
- **State migration for checkpointed graphs (proposal 0014, introduced in spec v0.15.0; refined by proposal 0018 in spec v0.16.0).** Saved checkpoints whose `schema_version` doesn't match the current state class now route through a registered migration chain instead of failing on resume. Surface: `State.schema_version: ClassVar[str] = ""` (declare a non-empty value to opt in), `GraphBuilder.with_state_migration(from_version, to_version, migrate)` and `with_state_migrations(*migrations)` for registration, `StateMigration` and `MigrationRegistry` types exported from `openarmature.checkpoint`. Chain resolution is BFS over the registered edges; the shortest path wins. Three new error categories: `CheckpointStateMigrationChainAmbiguous` (proposal 0018: duplicate `(from, to)` pair at registration time, or multiple distinct shortest paths between the saved and current versions at resume time), `CheckpointStateMigrationMissing` (no chain bridges the versions), and `CheckpointStateMigrationFailed` (a migration function raised). All non-transient. Post-migration deserialization failures still route to `CheckpointRecordInvalid` per §10.12.4. The same chain applies to each entry in `parent_states` in lockstep with the outer state per §10.12.2. Routing precedence per §10.10 (v0.16.0): chain-ambiguous → missing → failed → record-invalid.
- **`Checkpointer.supports_state_migration` Protocol attribute.** Marks whether a backend can expose the structural intermediate form (a plain dict, JSON tree) the migration registry consumes. `SQLiteCheckpointer(serialization="json")` opts in; `SQLiteCheckpointer(serialization="pickle")` and `InMemoryCheckpointer` opt out. On version mismatch against a non-migration-eligible backend the engine raises `CheckpointRecordInvalid` per spec §10.12.1.
- **`openarmature.checkpoint.migrate` OTel span (proposal 0014 §6 cross-ref).** Versioned resumes whose migration chain runs emit a zero-duration `openarmature.checkpoint.migrate` span on the OTel observer, parented under the invocation root span. Attributes: `openarmature.checkpoint.migrate.from_version`, `openarmature.checkpoint.migrate.to_version` (the final target), `openarmature.checkpoint.migrate.chain_length`. The §10.12.3 fast path (versions match, registry not consulted) emits no span. Engine-side: a synthetic `checkpoint_migrated` observer phase carries a `_MigrationSummary` payload from `_migrate_record` through to the OTel observer; the new phase is gated off default subscriptions (observers opt in explicitly via `phases={..., "checkpoint_migrated"}`).
Expand All @@ -25,13 +29,13 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Changed

- **Pinned spec version: 0.10.0 → 0.16.0.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.0 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. Only the surfaces introduced by proposals 0014–0017 are implemented in the batch's release; fixtures from 0011 are deferred-skip in the conformance suite and unmark with PR-5.
- **Pinned spec version: 0.10.0 → 0.16.1.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.1 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. All five proposals (0011, 0014, 0015, 0016, 0017) are implemented in the batch's release; the v0.16.1 clarification of attempt-index propagation through transitive retry middleware lands with the proposal 0011 implementation.
- **`CheckpointRecord.schema_version` semantic shift (proposal 0014).** Previously a backend-internal record-shape version (`CHECKPOINT_SCHEMA_VERSION = "1"` constant), now the user-facing state-schema version per spec §10.2. The framework reads `type(state).schema_version` at save time. Pre-PR-4 records carrying `"1"` are reinterpreted as user-facing v1 identifiers; users with such records either declare `schema_version="1"` on their state class or discard the pre-PR-4 records. `SQLiteCheckpointer` no longer rejects records with non-default `schema_version` at the backend boundary; version-mismatch routing is now an engine concern at resume time. The `CHECKPOINT_SCHEMA_VERSION` module constant is removed; future record-shape evolution can add backend-private metadata fields if needed.
- **`NodeEvent.pre_state` typed `Any` (was `State`).** Required by the new `checkpoint_migrated` phase which carries a `_MigrationSummary` payload rather than a `State` instance. Observer authors who type-narrowed `pre_state` to `State` should treat it as `Any` and narrow per-phase (e.g., `if event.phase == "completed": ...`). The `checkpoint_saved` phase already carried a State-flavored shape (not necessarily a typed `State` subclass instance), so this widens the declared type to match runtime reality rather than introducing a new constraint.

### Notes

- **Release gate: do not tag until all of {0011, 0014, 0015, 0016, 0017} are merged.** This batch implements one proposal per PR and lands a consolidated release after the fifth PR. Cutting a release tag before the batch is complete would ship a partial spec implementation against the v0.15.0 pin.
- **Release gate cleared with PR-5 (proposal 0011).** All five proposals in the batch ({0011, 0014, 0015, 0016, 0017}) are now implemented. Tag the consolidated release once this PR merges.
- **Pre-1.0 MINOR.** Existing free-form callers (no `response_schema`) see no behavior change — the new field defaults to `None`, the wire body omits `response_format`, and `Response.parsed` remains absent.

## [0.5.0] — 2026-05-10
Expand Down
3 changes: 3 additions & 0 deletions docs/concepts/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ the framework, or jump to whichever concept you need.
data seam.
- [Fan-out](fan-out.md): running the same subgraph many times in
parallel, results merged back deterministically.
- [Parallel branches](parallel-branches.md): dispatching M
heterogeneous subgraphs concurrently with per-branch state schemas
and middleware.
- [LLMs](llms.md): how LLM calls fit into nodes, structured output,
routing on parsed fields, errors at the LLM boundary.
- [Observability](observability.md): node-boundary hooks, OTel mapping,
Expand Down
18 changes: 17 additions & 1 deletion docs/concepts/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class NodeEvent:
attempt_index: int = 0
fan_out_index: int | None = None
fan_out_config: FanOutEventConfig | None = None
branch_name: str | None = None
```

A walk-through:
Expand Down Expand Up @@ -130,7 +131,11 @@ A walk-through:
`len(parent_states) == len(namespace) - 1`.

- **`attempt_index`**: 0-based retry attempt counter. `0` for nodes
not wrapped by retry middleware; `1+` for retries.
not wrapped by retry middleware; `1+` for retries. Retry middleware
may wrap transitively — a retry on a [parallel-branches
branch](parallel-branches.md) or fan-out `instance_middleware`
re-runs the whole subgraph; events from inner nodes carry the
wrapping retry's attempt counter.

- **`fan_out_index`**: 0-based per-instance index for events inside
a fan-out instance; `None` outside.
Expand All @@ -140,6 +145,17 @@ A walk-through:
`item_count` / `concurrency` / `error_policy` / `parent_node_name`.
`None` on every other event.

- **`branch_name`**: populated on events from nodes inside a
[parallel-branches branch](parallel-branches.md), carrying the
branch's name as declared on the dispatcher. `None` outside.
Independent of `fan_out_index` — both may be present simultaneously
when a parallel-branches branch contains a fan-out (or a fan-out
instance contains a parallel-branches node). The combination
`(namespace, branch_name, fan_out_index, attempt_index, phase)`
uniquely identifies each event source. On the OTel mapping
side, an `openarmature.branch_name` span attribute is added in
parallel to the existing `openarmature.node.fan_out_index`.

## Routing errors and the completed event

When a conditional edge raises or returns an invalid target:
Expand Down
151 changes: 151 additions & 0 deletions docs/concepts/parallel-branches.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Parallel branches

Dispatch M heterogeneous subgraphs concurrently, projected outputs
merged back into the parent via the parent's reducers in branch
insertion order.

Sibling to [fan-out](fan-out.md) (same `for each thing, do work in
parallel` shape), but the *thing* is different per branch: a research
subgraph, a categorize subgraph, a sentiment subgraph — each with its
own state schema, its own middleware, its own observer events —
running in parallel and joining their results into one parent state.

## When to reach for parallel branches

The signal: a fixed set of named operations, each with its own
behavior and state schema, that don't depend on each other. Three
classifiers running independently against the same input. A research
step, a translate step, and a fact-check step that all want the
parent's prompt. M is known at build time and small (typically 2–6),
and each branch is its own subgraph because each has its own
internal pipeline worth modelling separately.

Fan-out is the right pick when you have N similar pieces of work,
N depends on runtime state, and the work is the same across instances.
Parallel branches is the right pick when M is a small fixed set of
different operations that happen to run concurrently.

## The shape

```python
from openarmature.graph import BranchSpec, GraphBuilder

builder.add_parallel_branches_node(
"dispatcher",
branches={
"research": BranchSpec(
subgraph=research_subgraph, # CompiledGraph[ResearchState]
inputs={"question": "prompt"}, # subgraph_field -> parent_field
outputs={"facts": "facts"}, # parent_field -> subgraph_field
),
"translate": BranchSpec(
subgraph=translate_subgraph, # CompiledGraph[TranslateState]
inputs={"source": "prompt"},
outputs={"translation": "translated"},
),
"fact_check": BranchSpec(
subgraph=fact_check_subgraph, # CompiledGraph[FactCheckState]
inputs={"claim": "prompt"},
outputs={"verdict": "verdict"},
),
},
error_policy="fail_fast", # or "collect"
)
```

Each branch's `subgraph` is a compiled graph; `inputs` and `outputs`
mirror the explicit projection shape from
[composition](composition.md#explicitmapping-declarative). The
branches dict's key is the branch name — used as the branch identity
on observer events (see [observability](observability.md)) and in
the per-branch error records that `error_policy: "collect"`
produces.

## Per-branch state, inputs and outputs

Each branch runs its own subgraph against its own state — heterogeneous
schemas are explicit. Subgraph fields named in `inputs` are seeded
from the parent's corresponding field at branch entry; other subgraph
fields take their schema defaults. At branch exit, only the parent
fields named in `outputs` receive contributions; the rest of the
branch's final state is discarded.

When two branches contribute to the same parent field, the parent's
reducer for that field applies both values in **branch insertion
order** — first the branch declared first in the `branches` dict,
then the next, and so on. This is deterministic regardless of which
branch's inner work finishes first.

## Error policy

- **`"fail_fast"`** (default): the first branch failure cancels
the in-flight siblings and propagates as
`ParallelBranchesBranchFailed` (a `NodeException` subtype) carrying
the failing `branch_name` and the original cause as `__cause__`.
`recoverable_state` is the parent's snapshot at the moment the
dispatcher entered — **no buffered branch contributions are
applied**, including those of branches that successfully completed
before the failure. Buffer-and-apply semantics: contributions are
held until every branch finishes, then either all apply (success)
or none apply (fail_fast failure).
- **`"collect"`**: every branch runs to completion. Successful
branches' contributions merge in insertion order; failed branches'
`outputs` projections do NOT fire (their named parent fields stay
at their defaults). If you declare `errors_field` on the dispatcher,
each failed branch produces a record with at minimum
`{"branch_name": <name>, "category": <category>}` appended to that
parent list field; the implementation may include additional keys
(message, cause_type) and tests should match by the spec-mandated
keys rather than strict equality.

## Branch middleware

Each `BranchSpec` accepts a `middleware` tuple — middlewares that
wrap that branch's whole subgraph invocation as a unit. Retry
middleware on a branch retries the **whole branch**: a fresh
subgraph invocation each time, fresh inner-node execution. The
wrapping retry's attempt counter propagates to events emitted from
inner nodes (per graph-engine §6 v0.16.1), so observer events
inside the branch correctly show `attempt_index` ticking across
retries.

Branch middleware is independent across branches — branch A may
have `[retry, timing]`; branch B may have `[]`; branch C may have
some custom breaker. Each branch's chain composes in isolation.

## Composition with other constructs

Parallel branches compose with the rest of the engine the way
subgraphs and fan-outs do:

- A branch's subgraph can itself contain a fan-out node — inner-node
events inside that fan-out carry **both** `branch_name` (this
branch) and `fan_out_index` (the instance within this branch).
The two fields are independent.
- The parallel-branches node itself can be invoked from inside a
fan-out instance — inner events then carry the outer fan-out's
`fan_out_index` and the inner branch's `branch_name`.
- Per-graph and per-node middleware on the parallel-branches node
wrap the dispatcher as a single unit — one `started` event before
dispatch begins, one `completed` event after all branches finish
and fan-in lands. The parent's retry middleware retries the **whole
parallel-branches node**, not individual branches.

## Resume semantics

Parallel-branches nodes use the same **atomic restart** model as
fan-out (per spec §10.7): if a checkpoint resume lands on a
parallel-branches node, all branches re-dispatch from scratch.
Per-branch progress is not individually persisted in v1.

## When parallel branches is NOT the right shape

- **Not the same as N copies of one subgraph.** If you want "run
this subgraph for each item in a list," reach for
[fan-out](fan-out.md).
- **Not a router.** A router is a conditional-edge pattern — pick
one branch based on state. Parallel branches runs *all* branches
concurrently.
- **Not a coordinator.** Branches don't communicate with each other
during execution; if branch B's work depends on branch A's
output, you want a linear pipeline (A → B), not parallel branches.
2 changes: 1 addition & 1 deletion openarmature-spec
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
Specification = "https://github.com/LunarCommand/openarmature-spec"

[tool.openarmature]
spec_version = "0.16.0"
spec_version = "0.16.1"

[dependency-groups]
dev = [
Expand Down
2 changes: 1 addition & 1 deletion src/openarmature/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""OpenArmature — workflow framework for LLM pipelines and tool-calling agents."""

__version__ = "0.5.0"
__spec_version__ = "0.16.0"
__spec_version__ = "0.16.1"
7 changes: 7 additions & 0 deletions src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
MultipleOutgoingEdges,
NoDeclaredEntry,
NodeException,
ParallelBranchesBranchFailed,
ParallelBranchesNoBranches,
ReducerError,
RoutingError,
RuntimeGraphError,
Expand All @@ -45,6 +47,7 @@
)
from .nodes import FunctionNode, Node
from .observer import Observer, RemoveHandle, SubscribedObserver
from .parallel_branches import BranchSpec, ParallelBranchesNode
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
from .reducers import Reducer, append, last_write_wins, merge
from .state import State
Expand Down Expand Up @@ -79,6 +82,10 @@
"NodeException",
"NoDeclaredEntry",
"Observer",
"ParallelBranchesBranchFailed",
"ParallelBranchesNoBranches",
"ParallelBranchesNode",
"BranchSpec",
"ProjectionStrategy",
"Reducer",
"ReducerError",
Expand Down
Loading
Loading