From 2509835f2c3281699a95051cbd859700387a281f Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 19:19:24 -0700 Subject: [PATCH 1/2] Add caller-supplied invocation metadata (proposal 0034) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements observability §3.4 + §5.6 + §8.4.1/§8.4.2: callers pass ``invoke(metadata={...})`` and the framework propagates the entries to every observability backend. Public surface: - ``compiled.invoke(metadata=...)`` accepts a per-invocation ``dict[str, AttributeValue]`` mapping where values are OTel scalars or homogeneous arrays (str/int/float/bool). - ``openarmature.observability.set_invocation_metadata(**entries)`` augments the in-scope mapping mid-invocation (additive merge; existing keys overwritten). - ``openarmature.observability.current_invocation_metadata()`` reads the in-scope mapping (returns empty MappingProxyType outside an invocation). - Synchronous validation at the API boundary rejects keys under the reserved ``openarmature.*`` / ``gen_ai.*`` prefixes and rejects non-OTel-compatible value types (``ValueError`` before any work begins). Engine internals: - ContextVar lifecycle in ``openarmature.observability.metadata``; engine drives ``_set_invocation_metadata`` / ``_reset_invocation_metadata`` around the outermost ``invoke()`` call. - ``NodeEvent.caller_invocation_metadata`` carries a dispatch-time snapshot (the deliver_loop task's Context is frozen at invoke time, so observers can't re-read the live ContextVar safely). - ``LlmEventPayload.caller_invocation_metadata`` mirrors the pattern for LLM provider events. OTel observer (§5.6): emits each entry as ``openarmature.user.`` on every span — invocation, node, subgraph wrapper, fan-out instance dispatch, LLM provider, detached roots, checkpoint-migrate, checkpoint-save. Cross-cutting attribute family parallel to ``openarmature.correlation_id``. Langfuse observer (§8.4.1 + §8.4.2): merges each entry as a top-level key into ``trace.metadata`` (at trace open) and into every observation's ``metadata`` bag (leaf nodes, subgraph wrappers, fan-out instance dispatches, detached-trace wrappers + link observations, LLM generations). Conformance fixtures (proposal 0035's full set is 026 / 027 / 028 / 029 / 030): - 026 (OTel cross-cutting) and 027 (Langfuse top-level merge) activated. - 028 (boundary rejection) activated via a dedicated ``_run_fixture_028`` runner that attaches both observers and asserts ``ValueError`` + no spans + no Langfuse traces. - 029 (fan-out per-instance) and 030 (parallel-branches per-branch) stay deferred — they need an ``augment_metadata_from_field`` harness primitive that calls ``set_invocation_metadata`` per fan-out instance / per parallel branch. The augmentation surface is already exercised by unit tests; the conformance fixtures un-defer in a follow-up. Adds 25 focused unit tests covering boundary validation, ContextVar lifecycle, augmentation merge / overwrite, reserved-namespace rejection, ``invoke()``-boundary rejection, mid-invocation augmentation persisting to subsequent nodes, ``openarmature.user.*`` emission on every OTel span, and Langfuse top-level merge on trace + every observation. The OTel-side LLM-payload runner was extended to handle multi-node graphs where ``calls_llm`` is on a non-entry node (fixture 026 has a ``prep`` step before the LLM call). --- src/openarmature/graph/compiled.py | 37 ++ src/openarmature/graph/events.py | 26 +- src/openarmature/llm/providers/openai.py | 2 + src/openarmature/observability/__init__.py | 12 + .../observability/langfuse/observer.py | 34 ++ src/openarmature/observability/llm_event.py | 14 +- src/openarmature/observability/metadata.py | 219 +++++++++++ .../observability/otel/observer.py | 28 +- tests/conformance/test_fixture_parsing.py | 31 +- tests/conformance/test_observability.py | 156 +++++++- .../test_observability_langfuse.py | 7 + tests/unit/test_observability_metadata.py | 340 ++++++++++++++++++ 12 files changed, 890 insertions(+), 16 deletions(-) create mode 100644 src/openarmature/observability/metadata.py create mode 100644 tests/unit/test_observability_metadata.py diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index 63ff899..0c33531 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -78,6 +78,12 @@ current_active_observer_span, current_attempt_index, ) +from openarmature.observability.metadata import ( + _reset_invocation_metadata, + _set_invocation_metadata, + current_invocation_metadata, + validate_invocation_metadata, +) from .edges import END, ConditionalEdge, EndSentinel, StaticEdge from .errors import ( @@ -767,6 +773,7 @@ async def invoke( *, correlation_id: str | None = None, resume_invocation: str | None = None, + metadata: Mapping[str, Any] | None = None, ) -> StateT: """Run the graph from ``initial_state`` to END and return the final state. @@ -805,8 +812,33 @@ async def invoke( own retry logic if transient backend failures should be reattempted. + **Caller-supplied invocation metadata (proposal 0034).** + + - ``metadata`` is an optional mapping of arbitrary + ``key → value`` entries the framework propagates to every + observability backend. Values MUST be OTel-attribute- + compatible scalars (``str`` / ``int`` / ``float`` / ``bool``) + or homogeneous arrays of those types. Keys MUST NOT use + the ``openarmature.*`` or ``gen_ai.*`` reserved namespaces. + Validation runs synchronously at the API boundary; rule + violations raise ``ValueError`` BEFORE any work begins. + - Per spec §5.6 the OTel observer emits each entry as an + ``openarmature.user.`` cross-cutting span attribute on + every span and OTel log record. The Langfuse observer + merges each entry into ``trace.metadata`` AND every + ``observation.metadata`` (top level, sibling to + ``correlation_id``). + - Mid-invocation augmentation via + :func:`openarmature.observability.set_invocation_metadata` + merges into the same ContextVar with the same validation + rules; affects spans emitted AFTER the call returns. + Raises one of the runtime error categories on failure. """ + # Validate caller-supplied metadata at the API boundary so any + # rule violation surfaces synchronously before the worker task + # is created or any node body runs. + validated_metadata = validate_invocation_metadata(metadata) invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ())) queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() @@ -943,6 +975,7 @@ async def invoke( # "per-invocation is OUTERMOST invoke" wording). correlation_token = _set_correlation_id(resolved_correlation_id) invocation_token = _set_invocation_id(invocation_id) + metadata_token = _set_invocation_metadata(validated_metadata) worker = asyncio.create_task(deliver_loop(queue, context.drain_counters)) self._active_workers[worker] = context # Auto-prune: when the worker completes (after the sentinel is @@ -978,6 +1011,7 @@ async def invoke( try: return await self._invoke(starting_state, context) finally: + _reset_invocation_metadata(metadata_token) _reset_invocation_id(invocation_token) _reset_correlation_id(correlation_token) # Sentinel terminates the worker after it processes events @@ -1988,6 +2022,7 @@ def _dispatch_started( fan_out_config=fan_out_config, branch_name=current_branch_name(), subgraph_identities=context.subgraph_identities, + caller_invocation_metadata=current_invocation_metadata(), ), ) @@ -2022,6 +2057,7 @@ def _dispatch_completed( fan_out_config=fan_out_config, branch_name=current_branch_name(), subgraph_identities=context.subgraph_identities, + caller_invocation_metadata=current_invocation_metadata(), ), ) @@ -2205,5 +2241,6 @@ async def _maybe_save_checkpoint( attempt_index=attempt_index, fan_out_index=None, subgraph_identities=context.subgraph_identities, + caller_invocation_metadata=current_invocation_metadata(), ), ) diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 795fd8e..9fd8448 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -13,12 +13,21 @@ Frozen dataclass; observers receive a snapshot, not a live handle. """ -from dataclasses import dataclass +from collections.abc import Mapping +from dataclasses import dataclass, field +from types import MappingProxyType from typing import Any, Literal +from openarmature.observability.metadata import AttributeValue + from .errors import RuntimeGraphError from .state import State +# Sentinel empty metadata mapping for events constructed without a +# live caller-metadata snapshot (test helpers, synthetic events). +# Read-only proxy keeps the default allocation-free. +_EMPTY_METADATA: MappingProxyType[str, AttributeValue] = MappingProxyType({}) + # Spec: realizes observability §5.4 fan-out attributes via the # event-payload mechanism added by proposal 0013 (v0.10.0). Backend @@ -205,6 +214,21 @@ class NodeEvent: # empty string when ``None`` per §5.3's "if the implementation # tracks one" clause. subgraph_identities: tuple[str | None, ...] = () + # Per observability §3.4 + §5.6 (proposal 0034): snapshot of the + # caller-supplied invocation metadata at event-construction + # time. The engine reads ``current_invocation_metadata()`` when + # it constructs the event (in the engine task / node body's + # Context); the observer reads from the snapshot on the event + # rather than re-reading the ContextVar at observer time — + # critical because the observer runs on the engine's + # ``deliver_loop`` task whose Context is frozen at invoke time + # (asyncio.create_task copies the parent Context at task + # creation), so the live ContextVar value in the deliver_loop + # would NOT reflect mid-invocation augmentations made by node + # bodies running in the main engine task. Observers emit each + # entry as ``openarmature.user.`` (OTel, §5.6) / + # ``metadata.`` (Langfuse, §8.4.1+§8.4.2). + caller_invocation_metadata: Mapping[str, AttributeValue] = field(default_factory=lambda: _EMPTY_METADATA) __all__ = ["FanOutEventConfig", "NodeEvent"] diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index fa90de7..d01fff1 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -60,6 +60,7 @@ current_namespace_prefix, ) from openarmature.observability.llm_event import LlmEventPayload +from openarmature.observability.metadata import current_invocation_metadata # ``current_prompt_group`` / ``current_prompt_result`` are imported # lazily inside :meth:`OpenAIProvider.complete` to avoid a module-load @@ -1264,6 +1265,7 @@ def _make_llm_event( response_id=response_id, response_model=response_model, genai_system=genai_system, + caller_invocation_metadata=dict(current_invocation_metadata()), ) return NodeEvent( node_name="openarmature.llm.complete", diff --git a/src/openarmature/observability/__init__.py b/src/openarmature/observability/__init__.py index 7a751a1..3c3d1a0 100644 --- a/src/openarmature/observability/__init__.py +++ b/src/openarmature/observability/__init__.py @@ -41,6 +41,16 @@ # ``opentelemetry-sdk`` dependency) along. from .llm_event import LLM_NAMESPACE, LlmEventPayload +# v0.10.0 (proposal 0034): caller-supplied invocation metadata surface. +# `set_invocation_metadata` is the public augmentation helper users +# call from inside node bodies / middleware / observers; +# `current_invocation_metadata` is the public reader observers and +# capability code consume. +from .metadata import ( + current_invocation_metadata, + set_invocation_metadata, +) + __all__ = [ "LLM_NAMESPACE", "LlmEventPayload", @@ -50,5 +60,7 @@ "current_dispatch", "current_fan_out_index", "current_invocation_id", + "current_invocation_metadata", "current_namespace_prefix", + "set_invocation_metadata", ] diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 4ea7bae..da31acb 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -24,6 +24,7 @@ import json import uuid +from collections.abc import Mapping from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, cast @@ -80,6 +81,28 @@ def _empty_str_frozenset() -> frozenset[str]: return frozenset() +def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[str, Any]) -> None: + """Merge caller-supplied invocation metadata into a Trace's or + Observation's metadata bag at top level per observability §8.4.1 + + §8.4.2 (proposal 0034). + + Top-level placement is by spec: Langfuse UI filters on + ``metadata.`` directly, so caller-supplied entries become + siblings to ``correlation_id`` / ``entry_node`` rather than + nested under a ``user`` sub-object. + + Reserved-key collision with §8.4.1 / §8.4.2 keys + (``correlation_id``, ``entry_node``, ``spec_version``, + ``namespace``, etc.) is not currently checked here: the spec + permits the rejection to happen at either boundary, and the + ``invoke()`` API-boundary validation already rejects + ``openarmature.*`` / ``gen_ai.*`` prefixed keys. Per-Langfuse- + backend collision rejection is queued as a follow-up. + """ + for key, value in caller_metadata.items(): + metadata[key] = value + + def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: """Return the compiled-subgraph identity for the wrapper at the given 1-based namespace depth, or the empty string when no @@ -366,6 +389,7 @@ def _open_trace(self, invocation_id: str, correlation_id: str | None, event: Nod } if correlation_id is not None: metadata["correlation_id"] = correlation_id + _apply_caller_metadata(metadata, event.caller_invocation_metadata) # §8.6 trace name: caller-supplied invocation label takes # precedence; entry-node name is the spec-recommended fallback. # The caller-supplied path lands in proposal 0034 (PR 4) — for @@ -533,6 +557,7 @@ def _open_subgraph_observation( } if correlation_id is not None: metadata["correlation_id"] = correlation_id + _apply_caller_metadata(metadata, event.caller_invocation_metadata) handle = self.client.span( trace_id=inv_state.trace_id, name=prefix[-1], @@ -566,6 +591,7 @@ def _open_fan_out_instance_dispatch_observation( } if correlation_id is not None: metadata["correlation_id"] = correlation_id + _apply_caller_metadata(metadata, event.caller_invocation_metadata) handle = self.client.span( trace_id=inv_state.trace_id, name=prefix[-1], @@ -622,6 +648,7 @@ def _open_detached_subgraph_trace( } if correlation_id is not None: link_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(link_metadata, event.caller_invocation_metadata) parent_observation_id: str | None = None for plen in range(len(prefix) - 1, 0, -1): outer = prefix[:plen] @@ -646,6 +673,7 @@ def _open_detached_subgraph_trace( detached_metadata: dict[str, Any] = {"detached_from_invocation_id": inv_state.trace_id} if correlation_id is not None: detached_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(detached_metadata, event.caller_invocation_metadata) identity = _subgraph_identity_at(event, len(prefix)) # The detached trace's wrapper observation IS the migrated # SubgraphNode wrapper. Per the resolution in coord thread @@ -673,6 +701,7 @@ def _open_detached_subgraph_trace( } if correlation_id is not None: dispatch_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata) handle = self.client.span( trace_id=detached_trace_id, name=wrapper_obs_name, @@ -715,6 +744,7 @@ def _open_detached_fan_out_instance_trace( } if correlation_id is not None: link_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(link_metadata, event.caller_invocation_metadata) fan_out_open.handle.update(metadata=link_metadata) # Open the detached Trace + per-instance dispatch observation. detached_metadata: dict[str, Any] = { @@ -723,6 +753,7 @@ def _open_detached_fan_out_instance_trace( } if correlation_id is not None: detached_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(detached_metadata, event.caller_invocation_metadata) self.client.trace( id=detached_trace_id, name=prefix[-1], @@ -736,6 +767,7 @@ def _open_detached_fan_out_instance_trace( } if correlation_id is not None: dispatch_metadata["correlation_id"] = correlation_id + _apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata) handle = self.client.span( trace_id=detached_trace_id, name=prefix[-1], @@ -866,6 +898,7 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> metadata["fan_out_item_count"] = cfg.item_count metadata["fan_out_concurrency"] = 0 if cfg.concurrency is None else cfg.concurrency metadata["fan_out_error_policy"] = cfg.error_policy + _apply_caller_metadata(metadata, event.caller_invocation_metadata) return metadata # ------------------------------------------------------------------ @@ -1005,6 +1038,7 @@ def _llm_metadata_and_payload( active_group = payload.active_prompt_group if active_group is not None: metadata["prompt_group_name"] = active_group.group_name + _apply_caller_metadata(metadata, payload.caller_invocation_metadata) model_parameters: dict[str, Any] = {} request_params = payload.request_params or {} diff --git a/src/openarmature/observability/llm_event.py b/src/openarmature/observability/llm_event.py index aef4b17..134f496 100644 --- a/src/openarmature/observability/llm_event.py +++ b/src/openarmature/observability/llm_event.py @@ -37,7 +37,7 @@ from typing import Any -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field # Sentinel namespace the LLM provider emits to signal "this is an LLM # event, not a regular node event." Backend mappings (the OTel observer @@ -112,6 +112,18 @@ class LlmEventPayload(BaseModel): response_id: str | None = None response_model: str | None = None genai_system: str = "openai" + # Per proposal 0034 / observability §3.4 + §5.6: snapshot of + # caller-supplied invocation metadata captured at LLM-event + # dispatch time (in the calling node's Context). Backend + # observers read from the snapshot rather than re-reading the + # ContextVar at observer time — the OTel + Langfuse observers + # run on the engine's ``deliver_loop`` task whose Context is + # frozen at invoke time, so mid-invocation augmentations made + # by node bodies running in the main engine task are NOT visible + # there. The snapshot pattern mirrors the existing + # ``calling_namespace_prefix`` / ``calling_attempt_index`` / + # ``calling_fan_out_index`` fields. + caller_invocation_metadata: dict[str, Any] = Field(default_factory=dict) __all__ = ["LLM_NAMESPACE", "LlmEventPayload"] diff --git a/src/openarmature/observability/metadata.py b/src/openarmature/observability/metadata.py new file mode 100644 index 0000000..4be0225 --- /dev/null +++ b/src/openarmature/observability/metadata.py @@ -0,0 +1,219 @@ +# Spec: realizes observability §3.4 + §5.6 (proposal 0034). +# Caller-supplied invocation metadata is held in a ContextVar per +# §3.4's "MUST propagate via the language's idiomatic context +# primitive"; per-async-context copy-on-write semantics give fan-out +# instances and parallel branches independent metadata views without +# explicit threading. Validation lives here so the same rules apply +# at the ``invoke()`` boundary and at mid-invocation augmentation +# via ``set_invocation_metadata``. + +"""Caller-supplied invocation metadata (proposal 0034). + +Two surfaces: + +- :func:`current_invocation_metadata` — public reader; returns the + metadata mapping in scope for the current async context, or the + empty mapping outside any invocation. +- :func:`set_invocation_metadata` — public augmentation helper. + Merges the supplied entries into the current context's metadata + (additive; existing keys are overwritten). Affects observations / + spans emitted AFTER the call returns. + +Plus the engine-internal lifecycle helpers (``_set_invocation_metadata`` / +``_reset_invocation_metadata``) that ``CompiledGraph.invoke`` drives +around the outermost call. + +Validation rules (apply at every entry point): + +- Keys MUST be strings. +- Keys MUST NOT start with ``openarmature.`` or ``gen_ai.`` (reserved + for spec-normative attribute namespaces; collisions would silently + overwrite OA-emitted state at the observer layer). +- Values MUST be OTel-attribute-compatible scalars: ``str``, ``int``, + ``float``, ``bool``, or a homogeneous list/tuple of those types. + ``None``, nested objects, and mixed-type arrays are rejected. + +All boundary violations raise :class:`ValueError` with a message +naming the offending key. +""" + +from __future__ import annotations + +from contextvars import ContextVar, Token +from types import MappingProxyType +from typing import Any, cast + +# OTel-compatible attribute value type per spec §3.4 + OTel's +# AnyValue contract. Homogeneous arrays only; the validator enforces +# the homogeneity at runtime. +AttributeValue = str | int | float | bool | list[str] | list[int] | list[float] | list[bool] + +# Sentinel empty mapping that the ContextVar default holds. Using a +# read-only proxy keeps the "no metadata in scope" branch +# allocation-free across calls and prevents accidental mutation of +# the default by any caller that holds the reference. +_EMPTY_METADATA: MappingProxyType[str, AttributeValue] = MappingProxyType({}) + +_invocation_metadata_var: ContextVar[MappingProxyType[str, AttributeValue]] = ContextVar( + "openarmature.invocation_metadata", default=_EMPTY_METADATA +) + +# Reserved key prefixes per §3.4. Keys with these prefixes are +# off-limits to caller-supplied metadata; the engine rejects at the +# boundary so observers never see a colliding key. +_RESERVED_PREFIXES: tuple[str, ...] = ("openarmature.", "gen_ai.") + + +def current_invocation_metadata() -> MappingProxyType[str, AttributeValue]: + """Return the caller-supplied invocation metadata in scope, or the + empty mapping outside any invocation. + + Observers and capability code (LLM provider span hook, Langfuse + observer, OTel observer) read this to surface the mapping on + backend-specific records. The returned mapping is read-only; + callers MUST NOT mutate it. Use :func:`set_invocation_metadata` + to add entries. + """ + return _invocation_metadata_var.get() + + +def set_invocation_metadata(**entries: AttributeValue) -> None: + """Merge ``entries`` into the current async context's invocation + metadata. Additive: existing keys with the same names are + overwritten; other keys are preserved. + + Per spec §3.4: affects spans / observations emitted AFTER the + call returns; spans already closed are NOT retroactively updated. + Implementations MAY update open root-level surfaces (e.g., the + Langfuse Trace's metadata) where the backend SDK supports it; + Langfuse's ``trace.update`` is the canonical example. The + framework's helper here just maintains the ContextVar; per- + backend update propagation is the observer's concern. + + Raises :class:`ValueError` if any key violates the reserved- + namespace rule or any value is not OTel-attribute-compatible. + + Outside any active invocation, this still updates the + ContextVar (a fresh per-context override), but the value will + not be observed by any backend since no observer is in scope. + The empty-invocation case is supported for symmetry; users + typically call this from inside a node body, middleware, or + observer where an invocation is already in flight. + """ + if not entries: + return + for key, value in entries.items(): + _validate_metadata_key(key) + _validate_metadata_value(key, value) + merged: dict[str, AttributeValue] = dict(_invocation_metadata_var.get()) + merged.update(entries) + _invocation_metadata_var.set(MappingProxyType(merged)) + + +def validate_invocation_metadata(mapping: object) -> MappingProxyType[str, AttributeValue]: + """Validate a caller-supplied metadata mapping and return the + read-only view the engine stashes on the ContextVar. + + Public so the engine (`CompiledGraph.invoke`) calls this at the + boundary BEFORE any work begins; per spec §3.4 the rejection + surfaces as a synchronous error to the caller of ``invoke()`` + rather than as a backend-emission failure. + + Returns the validated read-only mapping. Raises :class:`ValueError` + on any rule violation (with a message naming the offending key). + """ + if mapping is None: + return _EMPTY_METADATA + if not isinstance(mapping, dict): + raise ValueError(f"invocation metadata must be a dict (or None); got {type(mapping).__name__}") + typed_mapping = cast("dict[Any, Any]", mapping) + validated: dict[str, AttributeValue] = {} + for key, value in typed_mapping.items(): + _validate_metadata_key(key) + _validate_metadata_value(key, value) + validated[key] = value + return MappingProxyType(validated) + + +def _validate_metadata_key(key: Any) -> None: + if not isinstance(key, str): + raise ValueError(f"invocation metadata key must be a string; got {type(key).__name__}") + for reserved in _RESERVED_PREFIXES: + if key.startswith(reserved): + raise ValueError( + f"invocation metadata key {key!r} uses reserved namespace prefix {reserved!r}; " + f"reserved prefixes are for spec-normative attributes (openarmature.*, gen_ai.*)" + ) + + +def _validate_metadata_value(key: str, value: Any) -> None: + # Scalars first — bool is checked BEFORE int because bool is a + # subclass of int in Python and the spec treats them as distinct + # AttributeValue variants. + if isinstance(value, bool): + return + if isinstance(value, (str, int, float)): + return + if isinstance(value, (list, tuple)): + seq = cast("list[Any] | tuple[Any, ...]", value) + if not seq: + # Empty arrays are accepted; the homogeneity check is + # trivially satisfied. + return + element_type: type | None = None + for element in seq: + # bool BEFORE int again for the array case. + etype: type + if isinstance(element, bool): + etype = bool + elif isinstance(element, (str, int, float)): + etype = type(element) + else: + raise ValueError( + f"invocation metadata key {key!r}: array element has unsupported type " + f"{type(element).__name__}; OTel AnyValue arrays accept only " + f"str / int / float / bool elements" + ) + if element_type is None: + element_type = etype + elif element_type is not etype: + raise ValueError( + f"invocation metadata key {key!r}: array elements MUST be homogeneous; " + f"saw {element_type.__name__} and {etype.__name__}" + ) + return + raise ValueError( + f"invocation metadata key {key!r}: value type {type(value).__name__} is not " + f"OTel-attribute-compatible; allowed: str, int, float, bool, or a homogeneous list/tuple of those" + ) + + +def _set_invocation_metadata( + value: MappingProxyType[str, AttributeValue], +) -> Token[MappingProxyType[str, AttributeValue]]: + """Set the invocation metadata for the current invocation. + Internal — callers OUTSIDE the engine should not touch this; the + engine paves the lifecycle in ``CompiledGraph.invoke`` around the + outermost call. + + Use :func:`set_invocation_metadata` for mid-invocation + augmentation (the public augmentation helper validates and + merges); this internal setter is for the boundary stash only. + """ + return _invocation_metadata_var.set(value) + + +def _reset_invocation_metadata( + token: Token[MappingProxyType[str, AttributeValue]], +) -> None: + _invocation_metadata_var.reset(token) + + +__all__ = [ + "AttributeValue", + "current_invocation_metadata", + "set_invocation_metadata", + "validate_invocation_metadata", + "_reset_invocation_metadata", + "_set_invocation_metadata", +] diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 63f2233..0ed63ec 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -75,7 +75,7 @@ from __future__ import annotations import json -from collections.abc import Callable, Sequence +from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, cast @@ -148,6 +148,23 @@ def _read_spec_version() -> str: return __spec_version__ +def _apply_caller_metadata(attrs: dict[str, Any], metadata: Mapping[str, Any]) -> None: + """Merge caller-supplied invocation metadata into a span's + attribute dict as ``openarmature.user.`` entries per + observability §5.6. + + Called at every span-emission site so the metadata family is + cross-cutting (invocation span, every node span, subgraph + dispatch, fan-out instance dispatch, LLM provider span, + detached roots). Source values may come from + ``NodeEvent.caller_invocation_metadata`` for graph events or + from ``LlmEventPayload.caller_invocation_metadata`` for LLM + events; both are dispatch-time snapshots. + """ + for key, value in metadata.items(): + attrs[f"openarmature.user.{key}"] = value + + def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: """Return the compiled-subgraph identity for the wrapper at the given 1-based namespace depth, or the empty string when no @@ -665,6 +682,7 @@ def _emit_checkpoint_migrate_span(self, event: NodeEvent) -> None: cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid + _apply_caller_metadata(attrs, event.caller_invocation_metadata) span = self._tracer.start_span( name="openarmature.checkpoint.migrate", context=parent_ctx, @@ -698,6 +716,7 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid + _apply_caller_metadata(attrs, event.caller_invocation_metadata) span = self._tracer.start_span( name="openarmature.checkpoint.save", context=cast("Any", parent_ctx), @@ -751,6 +770,7 @@ def _handle_llm_event(self, event: NodeEvent) -> None: cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid + _apply_caller_metadata(attrs, payload.caller_invocation_metadata) # Prompt-identity attributes: sourced from the dispatch- # time snapshot on the payload. Reading the ContextVar # here would return None because the dispatch worker @@ -925,6 +945,7 @@ def _open_invocation_span( } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs, event.caller_invocation_metadata) span = self._tracer.start_span( name="openarmature.invocation", kind=SpanKind.INTERNAL, @@ -1112,6 +1133,7 @@ def _open_subgraph_span( } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs, event.caller_invocation_metadata) span = self._tracer.start_span( name=prefix[-1], context=cast("Any", parent_ctx), @@ -1166,6 +1188,7 @@ def _open_detached_subgraph_root( } if correlation_id is not None: attrs_parent["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs_parent, event.caller_invocation_metadata) parent_dispatch = self._tracer.start_span( name=prefix[-1], context=cast("Any", parent_ctx_for_dispatch), @@ -1235,6 +1258,7 @@ def _open_detached_fan_out_instance_root( } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs, event.caller_invocation_metadata) instance_root = self._tracer.start_span( name=prefix[-1], context=cast("Any", detached_parent_ctx), @@ -1289,6 +1313,7 @@ def _open_fan_out_instance_dispatch_span( } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs, event.caller_invocation_metadata) instance_span = self._tracer.start_span( name=prefix[-1], context=cast("Any", parent_ctx), @@ -1370,6 +1395,7 @@ def _node_attrs(self, event: NodeEvent, correlation_id: str | None) -> dict[str, attrs["openarmature.fan_out.item_count"] = cfg.item_count attrs["openarmature.fan_out.concurrency"] = 0 if cfg.concurrency is None else cfg.concurrency attrs["openarmature.fan_out.error_policy"] = cfg.error_policy + _apply_caller_metadata(attrs, event.caller_invocation_metadata) return attrs # ------------------------------------------------------------------ diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index 9c6d5e3..424cc8b 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -76,12 +76,33 @@ def _id(case: tuple[str, Path]) -> str: "observability/033-langfuse-detached-trace-mode": ( "Langfuse shape models live in the dedicated test_observability_langfuse harness" ), - # proposal 0034 caller-supplied invocation metadata fixtures (PR 4). - "observability/027-langfuse-caller-supplied-metadata": "Caller-metadata harness lands in PR 4 (0034)", - "observability/028-caller-metadata-namespace-rejection": "Caller-metadata harness lands in PR 4 (0034)", - "observability/029-caller-metadata-fan-out-per-instance": "Caller-metadata harness lands in PR 4 (0034)", + # Proposal 0034 Langfuse caller-metadata fixture uses the + # ``langfuse_observer`` / ``langfuse_trace`` directive shapes the + # cross-capability parser doesn't model. The capability-specific + # harness at tests/conformance/test_observability_langfuse.py + # parses these directly via yaml + tailored helpers. + "observability/027-langfuse-caller-supplied-metadata": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), + # Proposal 0034 boundary-rejection fixture uses rejection + # invariants (``invoke_rejects_at_api_boundary``, ``no_spans_emitted``, + # ``no_langfuse_observations_emitted``) the cross-capability + # parser doesn't model. Driven by the dedicated runner + # ``_run_fixture_028`` in test_observability.py. + "observability/028-caller-metadata-namespace-rejection": ( + "Rejection invariants live in the dedicated _run_fixture_028 runner" + ), + # Proposal 0034 fan-out / parallel-branches caller-metadata + # fixtures need the harness primitive + # ``augment_metadata_from_field`` (per-instance / per-branch + # ``set_invocation_metadata`` calls). The 026/027/028 fixtures + # (cross-cutting + boundary rejection) shipped with PR 4; the + # augmentation primitive lands in a follow-up. + "observability/029-caller-metadata-fan-out-per-instance": ( + "Per-instance augmentation harness primitive lands in a follow-up" + ), "observability/030-caller-metadata-parallel-branches-per-branch": ( - "Caller-metadata harness lands in PR 4 (0034)" + "Per-branch augmentation harness primitive lands in a follow-up" ), # proposal 0033 added typed directive shapes (`secondary_manager`, # `label_resolver`, `cases`) the canonical parser doesn't model. diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 9069bd0..7ce6167 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -106,6 +106,16 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # v0.24.0 — proposal 0032 (three new declared RuntimeConfig # fields surfaced as gen_ai.request.* attributes). "025-otel-llm-request-params-extended", + # v0.10.0 — proposal 0034 (caller-supplied invocation metadata + # cross-cutting on every span). 026 verifies the + # ``openarmature.user.*`` attribute family lands on the + # invocation span, every node span, and the LLM provider span. + "026-otel-caller-supplied-metadata", + # 028 — proposal 0034 API-boundary rejection: caller-supplied + # metadata keys under reserved namespaces (openarmature.*, + # gen_ai.*) MUST raise at the ``invoke()`` boundary before + # any work begins. Two cases (one per reserved prefix). + "028-caller-metadata-namespace-rejection", } ) @@ -169,6 +179,8 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_010(spec) elif fixture_id == "011-otel-determinism": await _run_fixture_011(spec) + elif fixture_id == "028-caller-metadata-namespace-rejection": + await _run_fixture_028(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -181,6 +193,7 @@ async def test_observability_fixture(fixture_path: Path) -> None: "020-otel-llm-genai-system-override", "021-otel-llm-disable-genai-semconv", "025-otel-llm-request-params-extended", + "026-otel-caller-supplied-metadata", }: await _run_llm_payload_fixture(spec) else: @@ -845,6 +858,93 @@ def _signature( ) +async def _run_fixture_028(spec: Mapping[str, Any]) -> None: + """Proposal 0034 §3.4: caller-supplied metadata keys under + reserved namespaces (``openarmature.*``, ``gen_ai.*``) MUST + raise at the ``invoke()`` boundary before any work begins. + The harness asserts: + + - The invocation raises ``ValueError`` synchronously. + - No OTel spans are emitted (the OTel observer attached + to the graph never saw a single event). + - No Langfuse observations are emitted (the Langfuse + observer attached likewise saw nothing). + """ + from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: PLC0415 + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: PLC0415 + InMemorySpanExporter, + ) + + from openarmature.graph import END, GraphBuilder # noqa: PLC0415 + from openarmature.observability.langfuse import ( # noqa: PLC0415 + InMemoryLangfuseClient, + LangfuseObserver, + ) + + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + case_name = cast("str", case["name"]) + try: + # Build a minimal graph from the case's nodes/edges. The + # fixture's node is a noop update — we never expect it to + # run since the boundary rejects before any worker spins + # up. + from .adapter import build_state_cls # noqa: PLC0415 + + state_cls = build_state_cls("RejectionFixtureState", case["state"]["fields"]) + builder = GraphBuilder(state_cls) + nodes_spec = cast("dict[str, Any]", case["nodes"]) + for node_name, node_spec in nodes_spec.items(): + node_dict = cast("dict[str, Any]", node_spec) + update_block = cast("dict[str, Any]", node_dict["update"]) + + def _make_body(payload: dict[str, Any]) -> Any: + async def _body(_s: Any) -> dict[str, Any]: + return dict(payload) + + return _body + + builder.add_node(node_name, _make_body(update_block)) + for edge in cast("list[dict[str, str]]", case["edges"]): + target_raw = edge["to"] + target = END if target_raw == "END" else target_raw + builder.add_edge(edge["from"], target) + builder.set_entry(cast("str", case["entry"])) + graph = builder.compile() + + exporter = InMemorySpanExporter() + otel_observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + langfuse_client = InMemoryLangfuseClient() + langfuse_observer = LangfuseObserver(client=langfuse_client) + graph.attach_observer(otel_observer) + graph.attach_observer(langfuse_observer) + + caller_metadata = cast("dict[str, Any]", case["caller_metadata"]) + try: + with pytest.raises(ValueError, match="reserved namespace prefix"): + await graph.invoke(state_cls(), metadata=caller_metadata) + finally: + otel_observer.shutdown() + + expected = cast("dict[str, Any]", case["expected"]) + if expected.get("invoke_rejects_at_api_boundary"): + # Already verified above via pytest.raises. + pass + if expected.get("no_spans_emitted"): + spans = exporter.get_finished_spans() + assert len(spans) == 0, f"expected zero spans, got {[s.name for s in spans]}" + if expected.get("no_langfuse_observations_emitted"): + # Trace MAY exist (lazy-open on first event); the + # invariant is "no observations are emitted." Since + # invoke rejects before any event fires, neither + # trace nor observations should be created. + assert len(langfuse_client.traces) == 0, ( + f"expected zero Langfuse traces, got {sorted(langfuse_client.traces.keys())}" + ) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + def _normalize_attr_value(value: Any) -> Any: """OTel attribute values can be tuple or list shapes for sequence types depending on how they were set; normalize for comparison.""" @@ -1849,7 +1949,15 @@ async def _run_llm_payload_case(case: Mapping[str, Any]) -> None: # ---- Resolve harness primitives (content_repeat, base64_data_synthetic) nodes_spec = cast("dict[str, Any]", case["nodes"]) entry_name = cast("str", case["entry"]) - calls_llm_spec = cast("dict[str, Any]", nodes_spec[entry_name]["calls_llm"]) + # Most LLM-payload fixtures are single-node (the entry IS the + # calls_llm node); fixture 026 has a non-LLM ``prep`` step + # before the LLM call. Find whichever node carries ``calls_llm`` + # and treat the others as plain ``update`` nodes. + llm_node_name = next( + (name for name, spec in nodes_spec.items() if isinstance(spec, dict) and "calls_llm" in spec), + entry_name, + ) + calls_llm_spec = cast("dict[str, Any]", nodes_spec[llm_node_name]["calls_llm"]) raw_messages = cast("list[dict[str, Any]]", calls_llm_spec.get("messages", [])) materialized_messages, full_input_serialization = _materialize_messages( raw_messages, @@ -1918,12 +2026,40 @@ async def ask_llm_body(_s: Any) -> dict[str, str]: ) return {stores_in: response.message.content or ""} - builder = ( - GraphBuilder(state_cls) - .add_node(entry_name, ask_llm_body) - .add_edge(entry_name, END) - .set_entry(entry_name) - ) + # Build the graph: the calls_llm node uses ``ask_llm_body``; any + # other node carries an ``update:`` block translated to a simple + # async function that returns it verbatim. Edges come from the + # fixture's ``edges:`` list when present (multi-node case); the + # single-node case falls back to ``entry → END``. + builder = GraphBuilder(state_cls) + for node_name, node_spec in nodes_spec.items(): + if node_name == llm_node_name: + builder.add_node(node_name, ask_llm_body) + continue + node_dict = cast("dict[str, Any]", node_spec) + update_block = cast("dict[str, Any] | None", node_dict.get("update")) + if update_block is None: + raise AssertionError( + f"non-LLM node {node_name!r} in LLM fixture has neither " + f"`calls_llm` nor `update`; harness needs an extension" + ) + + def _make_update_body(payload: dict[str, Any]) -> Any: + async def _body(_s: Any) -> dict[str, Any]: + return dict(payload) + + return _body + + builder.add_node(node_name, _make_update_body(update_block)) + edges_spec = cast("list[dict[str, str]] | None", case.get("edges")) + if edges_spec is None: + builder.add_edge(llm_node_name, END) + else: + for edge in edges_spec: + target_raw = edge["to"] + target = END if target_raw == "END" else target_raw + builder.add_edge(edge["from"], target) + builder.set_entry(entry_name) graph = builder.compile() # ---- Observer @@ -1940,7 +2076,11 @@ async def ask_llm_body(_s: Any) -> dict[str, str]: # ---- Run + collect spans initial_state_cls = graph.state_cls - await graph.invoke(initial_state_cls()) + invoke_kwargs: dict[str, Any] = {} + caller_metadata = cast("dict[str, Any] | None", case.get("caller_metadata")) + if caller_metadata is not None: + invoke_kwargs["metadata"] = caller_metadata + await graph.invoke(initial_state_cls(), **invoke_kwargs) await graph.drain() observer.shutdown() spans = exporter.get_finished_spans() diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index df958e9..dfac35e 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -53,6 +53,10 @@ "022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage", + # 027 — proposal 0034 (caller-supplied metadata propagation + # into ``trace.metadata`` + every ``observation.metadata`` + # per §8.4.1 + §8.4.2). + "027-langfuse-caller-supplied-metadata", # 031 / 032 / 033 — proposal 0035 (spec v0.26.1). The # subgraph_identity wiring (per coord thread # `clarify-subgraph-name-semantics` msg 02 — Option A) is @@ -309,6 +313,9 @@ async def _run_case(case: Mapping[str, Any]) -> None: invoke_kwargs: dict[str, Any] = {} if correlation_id is not None: invoke_kwargs["correlation_id"] = correlation_id + caller_metadata = cast("dict[str, Any] | None", case.get("caller_metadata")) + if caller_metadata is not None: + invoke_kwargs["metadata"] = caller_metadata await graph.invoke(initial_state_factory(), **invoke_kwargs) await graph.drain() if provider is not None: diff --git a/tests/unit/test_observability_metadata.py b/tests/unit/test_observability_metadata.py new file mode 100644 index 0000000..45c6d5d --- /dev/null +++ b/tests/unit/test_observability_metadata.py @@ -0,0 +1,340 @@ +"""Unit tests for the caller-supplied invocation metadata surface +(proposal 0034 / observability §3.4 + §5.6 + §8.4.1+§8.4.2). + +These tests pin the validation rules, the ContextVar lifecycle, the +mid-invocation augmentation helper, and the per-async-context COW +isolation (fan-out instance augmentation doesn't leak to siblings). +The conformance fixtures (026/027/028/029/030) cover end-to-end +observer emission against the spec's expected shapes; these unit +tests focus on the python-side surface contract. +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from openarmature.graph import END, GraphBuilder, State +from openarmature.observability import ( + current_invocation_metadata, + set_invocation_metadata, +) +from openarmature.observability.metadata import ( + validate_invocation_metadata, +) + +# --------------------------------------------------------------------------- +# Boundary validation +# --------------------------------------------------------------------------- + + +def test_validate_accepts_simple_scalars() -> None: + out = validate_invocation_metadata( + { + "tenantId": "acme-corp", + "seatCount": 42, + "ratio": 0.75, + "isCanary": True, + } + ) + assert out["tenantId"] == "acme-corp" + assert out["seatCount"] == 42 + assert out["ratio"] == 0.75 + assert out["isCanary"] is True + + +def test_validate_accepts_homogeneous_arrays() -> None: + out = validate_invocation_metadata( + { + "labels": ["alpha", "beta"], + "weights": [1, 2, 3], + "scores": [0.1, 0.2], + "flags": [True, False], + } + ) + assert out["labels"] == ["alpha", "beta"] + assert out["weights"] == [1, 2, 3] + assert out["scores"] == [0.1, 0.2] + assert out["flags"] == [True, False] + + +def test_validate_none_returns_empty_mapping() -> None: + out = validate_invocation_metadata(None) + assert dict(out) == {} + + +def test_validate_rejects_openarmature_prefix() -> None: + with pytest.raises(ValueError, match=r"reserved namespace prefix 'openarmature\.'"): + validate_invocation_metadata({"openarmature.user.x": "y"}) + + +def test_validate_rejects_gen_ai_prefix() -> None: + with pytest.raises(ValueError, match=r"reserved namespace prefix 'gen_ai\.'"): + validate_invocation_metadata({"gen_ai.system": "openai"}) + + +def test_validate_rejects_non_string_key() -> None: + with pytest.raises(ValueError, match="key must be a string"): + validate_invocation_metadata({123: "v"}) # pyright: ignore[reportArgumentType] + + +def test_validate_rejects_none_value() -> None: + with pytest.raises(ValueError, match="value type NoneType"): + validate_invocation_metadata({"k": None}) # pyright: ignore[reportArgumentType] + + +def test_validate_rejects_nested_dict_value() -> None: + with pytest.raises(ValueError, match="value type dict"): + validate_invocation_metadata({"k": {"nested": "x"}}) # pyright: ignore[reportArgumentType] + + +def test_validate_rejects_mixed_type_arrays() -> None: + with pytest.raises(ValueError, match="MUST be homogeneous"): + validate_invocation_metadata({"mixed": [1, "two"]}) # pyright: ignore[reportArgumentType] + + +def test_validate_rejects_array_of_dicts() -> None: + with pytest.raises(ValueError, match="unsupported type"): + validate_invocation_metadata({"k": [{"a": 1}]}) # pyright: ignore[reportArgumentType] + + +def test_validate_accepts_empty_array() -> None: + out = validate_invocation_metadata({"empty": []}) + assert out["empty"] == [] + + +def test_validate_rejects_non_dict_mapping() -> None: + with pytest.raises(ValueError, match="must be a dict"): + validate_invocation_metadata("not a dict") # pyright: ignore[reportArgumentType] + + +# --------------------------------------------------------------------------- +# ContextVar reader outside any invocation +# --------------------------------------------------------------------------- + + +def test_current_invocation_metadata_empty_outside_invocation() -> None: + # Outside any invocation, the reader returns an empty mapping — + # not None — so callers can iterate without a guard. + assert dict(current_invocation_metadata()) == {} + + +# --------------------------------------------------------------------------- +# set_invocation_metadata augmentation +# --------------------------------------------------------------------------- + + +def test_set_invocation_metadata_augments_existing() -> None: + async def _runner() -> dict[str, Any]: + # Simulate the engine setting initial metadata. + from openarmature.observability.metadata import ( + _set_invocation_metadata, + validate_invocation_metadata, + ) + + token = _set_invocation_metadata(validate_invocation_metadata({"tenantId": "acme"})) + try: + set_invocation_metadata(productId="p-1", batchId=42) + return dict(current_invocation_metadata()) + finally: + from openarmature.observability.metadata import _reset_invocation_metadata + + _reset_invocation_metadata(token) + + result = asyncio.run(_runner()) + # Augmentation merges with the initial mapping; nothing dropped. + assert result == {"tenantId": "acme", "productId": "p-1", "batchId": 42} + + +def test_set_invocation_metadata_overwrites_existing_key() -> None: + async def _runner() -> dict[str, Any]: + from openarmature.observability.metadata import ( + _reset_invocation_metadata, + _set_invocation_metadata, + validate_invocation_metadata, + ) + + token = _set_invocation_metadata(validate_invocation_metadata({"phase": "draft"})) + try: + set_invocation_metadata(phase="final") + return dict(current_invocation_metadata()) + finally: + _reset_invocation_metadata(token) + + result = asyncio.run(_runner()) + assert result == {"phase": "final"} + + +def test_set_invocation_metadata_rejects_reserved_namespace() -> None: + with pytest.raises(ValueError, match="reserved namespace prefix"): + set_invocation_metadata(**{"openarmature.user.x": "y"}) + + +def test_set_invocation_metadata_no_op_when_empty() -> None: + # Calling with no entries does nothing (and doesn't error). + set_invocation_metadata() + assert dict(current_invocation_metadata()) == {} + + +# --------------------------------------------------------------------------- +# Engine integration: invoke(metadata=...) + boundary rejection +# --------------------------------------------------------------------------- + + +class _SimpleState(State): + counter: int = 0 + + +async def _noop_node(_s: _SimpleState) -> dict[str, Any]: + return {"counter": 1} + + +def _build_graph() -> Any: + return ( + GraphBuilder(_SimpleState) + .add_node("noop", _noop_node) + .add_edge("noop", END) + .set_entry("noop") + .compile() + ) + + +async def test_invoke_accepts_metadata() -> None: + graph = _build_graph() + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme", "seatCount": 42}) + + +async def test_invoke_rejects_reserved_namespace_at_boundary() -> None: + graph = _build_graph() + with pytest.raises(ValueError, match="reserved namespace prefix"): + await graph.invoke(_SimpleState(), metadata={"openarmature.user.x": "y"}) + + +async def test_invoke_rejects_bad_value_type_at_boundary() -> None: + graph = _build_graph() + with pytest.raises(ValueError, match="value type NoneType"): + # pyright: ignore[reportArgumentType] + await graph.invoke(_SimpleState(), metadata={"k": None}) # type: ignore[dict-item] + + +async def test_invoke_resets_metadata_after_return() -> None: + graph = _build_graph() + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme"}) + # After the invocation returns, the ContextVar is reset to empty + # so the next invocation gets a fresh slate. + assert dict(current_invocation_metadata()) == {} + + +async def test_metadata_visible_inside_node_body() -> None: + captured: dict[str, Any] = {} + + async def _capture(_s: _SimpleState) -> dict[str, Any]: + captured.update(dict(current_invocation_metadata())) + return {"counter": 1} + + graph = ( + GraphBuilder(_SimpleState) + .add_node("capture", _capture) + .add_edge("capture", END) + .set_entry("capture") + .compile() + ) + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme"}) + assert captured == {"tenantId": "acme"} + + +async def test_otel_observer_emits_user_metadata_on_every_span() -> None: + # Per observability §5.6: caller-supplied entries appear as + # `openarmature.user.` cross-cutting attributes on every + # span (invocation, node, LLM provider if present). + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + from openarmature.observability.otel import OTelObserver + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + graph = _build_graph() + graph.attach_observer(observer) + try: + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme", "seatCount": 42}) + await graph.drain() + finally: + observer.shutdown() + + spans = exporter.get_finished_spans() + assert len(spans) >= 2 # invocation span + noop node span + for span in spans: + attrs = dict(span.attributes or {}) + assert attrs.get("openarmature.user.tenantId") == "acme", ( + f"span {span.name!r} missing or wrong tenantId: {attrs}" + ) + assert attrs.get("openarmature.user.seatCount") == 42, ( + f"span {span.name!r} missing or wrong seatCount: {attrs}" + ) + + +async def test_langfuse_observer_emits_user_metadata_on_trace_and_observations() -> None: + # Per observability §8.4.1 + §8.4.2: caller-supplied entries + # appear on `trace.metadata` AND on every `observation.metadata` + # at the top level. + from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObserver, + ) + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + graph = _build_graph() + graph.attach_observer(observer) + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme", "featureFlag": "v2"}) + await graph.drain() + + assert len(client.traces) == 1 + trace = next(iter(client.traces.values())) + assert trace.metadata.get("tenantId") == "acme" + assert trace.metadata.get("featureFlag") == "v2" + # Every observation in the trace must also carry the entries. + assert len(trace.observations) >= 1 + for obs in trace.observations: + assert obs.metadata.get("tenantId") == "acme", ( + f"observation {obs.name!r} missing tenantId: {obs.metadata}" + ) + assert obs.metadata.get("featureFlag") == "v2", ( + f"observation {obs.name!r} missing featureFlag: {obs.metadata}" + ) + + +async def test_mid_invocation_augmentation_persists_to_next_node() -> None: + capture_a: dict[str, Any] = {} + capture_b: dict[str, Any] = {} + + async def _a(_s: _SimpleState) -> dict[str, Any]: + capture_a.update(dict(current_invocation_metadata())) + set_invocation_metadata(stage="a-completed") + return {"counter": 1} + + async def _b(_s: _SimpleState) -> dict[str, Any]: + capture_b.update(dict(current_invocation_metadata())) + return {"counter": 2} + + graph = ( + GraphBuilder(_SimpleState) + .add_node("a", _a) + .add_node("b", _b) + .add_edge("a", "b") + .add_edge("b", END) + .set_entry("a") + .compile() + ) + await graph.invoke(_SimpleState(), metadata={"tenantId": "acme"}) + # Node a sees the initial metadata. + assert capture_a == {"tenantId": "acme"} + # Node b sees the initial metadata PLUS node a's augmentation. + # Sequential nodes share the engine task's Context, so a's + # set_invocation_metadata persists into b's body. + assert capture_b == {"tenantId": "acme", "stage": "a-completed"} From 97c07b17ad49ddb42b709e1ae5c7779040d52186 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 19:32:08 -0700 Subject: [PATCH 2/2] Carry caller metadata on synthetic checkpoint_migrated event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The synthetic NodeEvent dispatched for a checkpoint_migrated phase in CompiledGraph.invoke was missed while wiring caller-invocation- metadata snapshots through the regular _dispatch_started / _dispatch_completed sites. Result: the openarmature.checkpoint.migrate span ended up as the only span missing the cross-cutting openarmature.user.* attribute set, violating observability §5.6's "every span" invariant. The metadata ContextVar is set 22 lines earlier in invoke(), so current_invocation_metadata() returns the right value at this synthetic dispatch site. Parallel to the three other dispatch sites updated in PR #86. --- src/openarmature/graph/compiled.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index 0c33531..303a25b 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -1006,6 +1006,7 @@ async def invoke( post_state=None, error=None, parent_states=(), + caller_invocation_metadata=current_invocation_metadata(), ), ) try: