Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions docs/agent/non-obvious-shapes.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,139 @@ The compiled graph stays usable for subsequent invocations after a timed-out dra
- `LlmProviderError` (in `openarmature.llm`) — provider call failures: `ProviderAuthentication`, `ProviderInvalidRequest`, `ProviderInvalidResponse`, `ProviderInvalidModel`, `ProviderModelNotLoaded`, `ProviderRateLimit`, `ProviderUnavailable`, `ProviderUnsupportedContentBlock`, `StructuredOutputInvalid`.

Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable.

### Reconcile `started` → `completed` pairs via a per-invocation dict keyed on `(namespace, attempt_index, fan_out_index)`

Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually.

The pair identity is `(namespace, attempt_index, fan_out_index)`: that triple is unique within an invocation (per graph-engine §6 uniqueness invariants). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`:
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated

```python
class StepTimingObserver:
def __init__(self) -> None:
# invocation_id -> {(namespace, attempt_index, fan_out_index): start_ts}
self._pending: dict[str, dict[tuple[Any, ...], float]] = {}

async def __call__(self, event: NodeEvent) -> None:
invocation_id = current_invocation_id()
if invocation_id is None:
return
key = (event.namespace, event.attempt_index, event.fan_out_index)
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated
if event.phase == "started":
self._pending.setdefault(invocation_id, {})[key] = time.monotonic()
elif event.phase == "completed":
start = self._pending.get(invocation_id, {}).pop(key, None)
if start is not None:
duration = time.monotonic() - start
# … emit timing
# Sweep when the dict empties (last completed for this invocation).
if not self._pending.get(invocation_id):
self._pending.pop(invocation_id, None)
```

The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair.

### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes

OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise:

```python
async def __call__(self, event: NodeEvent) -> None:
# Skip OA-internal events; only react to user node activity.
if event.namespace and event.namespace[0].startswith("openarmature."):
return
# … user-node handling
```

`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). The OA-internal events also intentionally don't carry an `invocation_id` in the way user-node events do — relying on `current_invocation_id() is None` as the filter would also work but couples to an internal detail; the namespace-prefix check is the stable contract.
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated

### A `with_state_migration` recipe — register migrations alongside the state class, run on resume

`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape.

Wire it up at compile time:

```python
class PipelineState(State):
schema_version: ClassVar[str] = "2"
# … v2 fields

def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict:
# Old field "step_count" renamed to "steps_completed" in v2.
state_dict["steps_completed"] = state_dict.pop("step_count", 0)
return state_dict

compiled = (
GraphBuilder(PipelineState)
.add_node("step", _step_body)
.add_edge("step", END)
.set_entry("step")
.with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2)
.compile()
)
compiled.attach_checkpointer(checkpointer)
```

Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies.

When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping.

### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field`

When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`:

```
attributed_candidates.0 Input should be a valid dictionary or
instance of ClaimCandidate [input_value=[ClaimCandidate(...)],
input_type=list]
```

The right fix is a flattening reducer. Until OA ships the spec-blessed built-ins (proposal 0036 — `concat_flatten` for the list-of-lists case, `merge_all` for the dict-of-mappings case — accepted in spec v0.27.0 but not yet absorbed into the python impl), use a small custom reducer:

```python
from openarmature.graph import Reducer

class _ConcatFlatten(Reducer):
name = "concat_flatten"

def __call__(self, prior: list[Any], update: list[list[Any]]) -> list[Any]:
return [*prior, *(item for sublist in update for item in sublist)]

concat_flatten = _ConcatFlatten()

class PipelineState(State):
attributed_candidates: Annotated[list[ClaimCandidate], concat_flatten] = ...
```

Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotated[list[X], append]`) don't hit this — the engine still wraps each instance's value as one element, but `append` flattens it correctly since each element is already an `X`. The list-of-lists shape only emerges when the per-instance value is itself a list.

If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path.

### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags)

Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend:

```python
await compiled.invoke(
initial_state,
metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"},
)
```

The OTel observer emits each entry as an `openarmature.user.<key>` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule.

Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins.

Mid-invocation augmentation via the public helper:

```python
from openarmature.observability import set_invocation_metadata

async def my_node(state: MyState) -> dict:
set_invocation_metadata(productId=state.product_id)
# subsequent spans (this node's completed, next node's started,
# any LLM call inside, etc.) carry productId
return {"score": await compute_score(state)}
```

The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary.
92 changes: 92 additions & 0 deletions docs/concepts/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,98 @@ though their `invocation_id`s differ. It's exported from the
`current_invocation_id` (and friends) for code that needs to thread
the IDs explicitly.

## Caller-supplied invocation metadata

`correlation_id` is one string; if you also need to attach
business-domain identifiers — tenant IDs, request IDs, feature
flags, A/B cohort labels — pass them as a structured mapping at
`invoke()` time:

```python
await compiled.invoke(
initial_state,
metadata={
"tenantId": "acme-corp",
"requestId": "req-12345",
"featureFlag": "v2-canary",
"seatCount": 42,
},
)
```

Every observability backend picks the entries up:

- **OTel** emits each entry as an `openarmature.user.<key>`
cross-cutting span attribute on every span — invocation, node,
subgraph wrapper, fan-out instance, LLM provider, retry attempt.
Backends that consume OTel attributes (Phoenix / Arize, Honeycomb,
Datadog APM, HyperDX, Grafana Tempo, custom collectors) see them
uniformly without per-backend wiring.
- **Langfuse** merges each entry as a top-level key into
`trace.metadata` AND into every `observation.metadata`. The
Langfuse UI filters on `metadata.<key>` directly, so dashboard
queries like "show me all traces for `tenantId == acme-corp`"
work without any custom dashboard config.

Validation runs at the `invoke()` boundary before any work begins.
Two rules:

- **Keys** MUST NOT start with `openarmature.` or `gen_ai.`
(reserved for spec-normative attribute namespaces; collisions
would silently overwrite OA-emitted state).
- **Values** MUST be OTel-attribute-compatible scalars (`str`,
`int`, `float`, `bool`) or homogeneous arrays of those types.
`None`, nested objects, and mixed-type arrays are rejected.

Violations raise `ValueError` synchronously — no spans emitted, no
work runs.

### Adding entries mid-invocation

From inside a node body, middleware, or observer, augment the
in-scope metadata via the public helper:

```python
from openarmature.observability import set_invocation_metadata

async def evaluate_product(state: PipelineState) -> dict[str, Any]:
set_invocation_metadata(productId=state.product_id, productCategory=state.category)
# Spans emitted AFTER this call carry productId + productCategory
# in addition to whatever the original invoke() metadata supplied.
response = await provider.complete(messages)
return {"score": parse_score(response.message.content)}
```

Spans already closed are NOT retroactively updated. Spans emitted
after the call (the current node's `completed` event, the next
node's `started`, any LLM call inside) pick up the new entries.

**Per-async-context scoping.** The metadata mapping lives in a
`ContextVar`, which Python copies on async-task creation. Fan-out
instances and parallel-branches each receive their own copy at
dispatch time — an instance that calls `set_invocation_metadata`
does NOT leak its augmentation to sibling instances. This is the
canonical pattern for per-instance identifiers:

```python
# Each fan-out instance adds its own productId; siblings stay clean
async def evaluate_product(state: ProductState) -> dict[str, Any]:
set_invocation_metadata(productId=state.product_id)
return await score_product(state)
```

Augmentation within the parent context (before fan-out dispatch, or
in code that runs serially) flows forward to subsequent spans in
that context, per normal `ContextVar` semantics.

### Reading the in-scope metadata

`openarmature.observability.current_invocation_metadata()` returns
the live mapping (or an empty `MappingProxyType` outside an
invocation). Observers and capability code read this to surface
the entries on backend-specific records; user code typically uses
`set_invocation_metadata` to write and lets the framework propagate.

## OpenTelemetry mapping (opt-in)

Install with the `[otel]` extra:
Expand Down
4 changes: 4 additions & 0 deletions docs/model-providers/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ nodes call it inside their bodies.

## Where to next

- **[Self-hosted vLLM](vllm.md)**: configure `OpenAIProvider` to
talk to a self-hosted vLLM server. Covers the base-URL contract,
the legacy-server fallback flag, the `gen_ai.system` override,
and readiness-probe limitations.
- **[Authoring a Provider](authoring.md)**: how to implement the
Protocol for a non-default wire format. Includes a ~60-line
skeleton + contract checklist.
Expand Down
Loading