From 3b8221688bf3bd9e3036c7a5d2265fe7e6e0c764 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 17:57:06 -0700 Subject: [PATCH 1/4] Bump spec submodule to v0.26.1, harness extension, audit fixes Submodule bump v0.26.0 -> v0.26.1 picks up proposal 0035's three Langfuse graph-topology fixtures (031 / 032 / 033). Pinned spec version, __spec_version__, conformance.toml, and the smoke test all move together. Two observer bugs surfaced during the placement audit and are fixed: * entry_node correctness: when the outer graph's entry is a SubgraphNode, the first event the observer sees comes from inside the subgraph (event.namespace = (wrapper, inner), event.node_name = inner). Old code used event.node_name for both trace.metadata.entry_node and trace.name, producing the inner node's name. Walk back to event.namespace[0] so the outer entry name is the one recorded. * Detached-mode link observation no longer carries subgraph_name. Per discuss-observability-langfuse-mapping msg 07: in detached mode the wrapper role migrates to the detached trace; the parent trace's link observation IS the SubgraphNode span (no wrapper role) and must not carry subgraph_name. The detached trace's dispatch observation still carries it. Harness extension: tests/conformance/test_observability_langfuse.py now handles subgraph / fan_out / detached-trace topology shapes via the cross-capability adapter.build_graph helper, plus multi-trace assertions for fixtures using langfuse_traces: (plural). Both 022/023/ 024 and the new topology fixtures share the same _run_case path; the branch is on whether the fixture carries any topology constructs. Fixtures 031 / 032 / 033 themselves stay deferred for now: they assert metadata.subgraph_name = while both observers emit subgraph_name = . Resolution queued in coord thread clarify-subgraph-name-semantics; un-defer (and apply any required impl change) once spec answers. Manifest entry for proposal 0035 added as status=not-yet, matching the 0031-0034 convention; the release PR will flip all five together. --- conformance.toml | 15 +- openarmature-spec | 2 +- pyproject.toml | 2 +- src/openarmature/AGENTS.md | 4 +- src/openarmature/__init__.py | 2 +- .../observability/langfuse/observer.py | 21 +- tests/conformance/test_fixture_parsing.py | 13 + .../test_observability_langfuse.py | 316 +++++++++++++++--- tests/test_smoke.py | 2 +- 9 files changed, 319 insertions(+), 58 deletions(-) diff --git a/conformance.toml b/conformance.toml index 8e212d6..5f73488 100644 --- a/conformance.toml +++ b/conformance.toml @@ -150,12 +150,12 @@ status = "textual-only" since = "0.9.0" note = "Drain snapshot semantic and timeout-input validation already implemented as part of the proposal 0010 impl PR (v0.9.0); no additional module-level work needed." -# Spec v0.23.0-v0.26.0 batch (proposals 0031, 0032, 0033, 0034). All -# four have impl work landing across the v0.10.0 release cycle; status -# stays `not-yet` until the release PR flips them to `implemented` -# with `since = "0.10.0"`. The pinned spec submodule advances ahead -# of the impl status because newer fixtures need to be visible to -# the conformance harness as each PR lands. +# Spec v0.23.0-v0.26.1 batch (proposals 0031, 0032, 0033, 0034, 0035). +# All five have impl work landing across the v0.10.0 release cycle; +# status stays `not-yet` until the release PR flips them to +# `implemented` with `since = "0.10.0"`. The pinned spec submodule +# advances ahead of the impl status because newer fixtures need to be +# visible to the conformance harness as each PR lands. [proposals."0031"] status = "not-yet" @@ -167,3 +167,6 @@ status = "not-yet" [proposals."0034"] status = "not-yet" + +[proposals."0035"] +status = "not-yet" diff --git a/openarmature-spec b/openarmature-spec index 53a91d6..da640df 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit 53a91d60c253dbdc66f443fc9b9710c6b70aa86d +Subproject commit da640dffa34a8e6c790e97a7f01ebaea2ce723f7 diff --git a/pyproject.toml b/pyproject.toml index ed0c53b..fe9737b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.26.0" +spec_version = "0.26.1" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index ed22fee..340a2cc 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.9.0 (spec v0.26.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.9.0 (spec v0.26.1). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents ## Capability contracts -_Sourced from openarmature-spec v0.26.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md`. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.26.1. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md`. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index 07b3b47..210af66 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,4 +25,4 @@ """ __version__ = "0.9.0" -__spec_version__ = "0.26.0" +__spec_version__ = "0.26.1" diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 1759f32..9756604 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -328,8 +328,18 @@ def _handle_completed(self, event: NodeEvent) -> None: inv_state.detached_traces.pop(event.namespace, None) def _open_trace(self, invocation_id: str, correlation_id: str | None, event: NodeEvent) -> None: + # ``entry_node`` and the trace name MUST identify the outer-graph + # entry, not whichever node fired first. Subgraph wrappers do not + # emit their own events — when the outer entry is a SubgraphNode + # the first event the observer sees comes from inside the + # subgraph (with ``event.namespace = (wrapper, inner)`` and + # ``event.node_name = inner``). Using ``event.namespace[0]`` + # walks back to the outermost prefix component, which IS the + # outer entry by construction (the graph engine fires inner + # events under the wrapper's namespace). + entry_node = event.namespace[0] if event.namespace else event.node_name metadata: dict[str, Any] = { - "entry_node": event.node_name, + "entry_node": entry_node, "spec_version": self.spec_version, } if correlation_id is not None: @@ -338,7 +348,7 @@ def _open_trace(self, invocation_id: str, correlation_id: str | None, event: Nod # precedence; entry-node name is the spec-recommended fallback. # The caller-supplied path lands in proposal 0034 (PR 4) — for # now only the fallback is wired. - trace_name = event.node_name + trace_name = entry_node self.client.trace(id=invocation_id, name=trace_name, metadata=metadata) self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) @@ -559,8 +569,13 @@ def _open_detached_subgraph_trace( # metadata immediately — the array-form preserves §8.5's # "string array, one entry per detached child" shape so # later detached siblings under the same parent can append. + # + # Note: `subgraph_name` is intentionally NOT on this link + # observation. Per §5.3 + §8.5, in detached mode the wrapper + # role migrates to the detached trace's dispatch observation; + # the main trace's link observation IS the SubgraphNode span + # (no wrapper role) and so does not carry `subgraph_name`. link_metadata: dict[str, Any] = { - "subgraph_name": prefix[-1], "detached_child_trace_ids": [detached_trace_id], } if correlation_id is not None: diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index 24ef1cd..9c6d5e3 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -63,6 +63,19 @@ def _id(case: tuple[str, Path]) -> str: "observability/024-langfuse-prompt-linkage": ( "Langfuse shape models live in the dedicated test_observability_langfuse harness" ), + # Proposal 0035 (spec v0.26.1) Langfuse graph-topology fixtures + # (031/032/033) introduce a ``langfuse_traces:`` (plural) expected + # shape for detached / multi-trace cases. Same deferral rationale as + # 022-024 — Langfuse shape models live in the dedicated harness. + "observability/031-langfuse-subgraph-span-hierarchy": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), + "observability/032-langfuse-fan-out-per-instance-spans": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), + "observability/033-langfuse-detached-trace-mode": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), # proposal 0034 caller-supplied invocation metadata fixtures (PR 4). "observability/027-langfuse-caller-supplied-metadata": "Caller-metadata harness lands in PR 4 (0034)", "observability/028-caller-metadata-namespace-rejection": "Caller-metadata harness lands in PR 4 (0034)", diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index b158d20..49d880d 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -1,10 +1,10 @@ -# Spec mapping (observability §8): drives the three Langfuse mapping -# fixtures (022 basic trace, 023 Generation rendering + truncation, 024 -# prompt linkage) against the in-memory LangfuseObserver client. Sibling -# of test_observability.py (OTel mapping); shares no harness state with +# Spec mapping (observability §8): drives the Langfuse mapping fixtures +# (022/023/024 basic/generation/prompt-linkage, plus 031/032/033 graph +# topology) against the in-memory LangfuseObserver client. Sibling of +# test_observability.py (OTel mapping); shares no harness state with # the OTel side — each fixture builds its own graph + observer instance. -"""Run spec observability Langfuse conformance fixtures (022-024).""" +"""Run spec observability Langfuse conformance fixtures.""" from __future__ import annotations @@ -34,7 +34,7 @@ ) from openarmature.prompts.context import with_active_prompt -from .adapter import build_state_cls +from .adapter import build_graph, build_state_cls CONFORMANCE_DIR = ( Path(__file__).resolve().parents[2] / "openarmature-spec" / "spec" / "observability" / "conformance" @@ -46,6 +46,15 @@ "022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage", + # 031 / 032 / 033 (proposal 0035, spec v0.26.1): harness + # extension below (subgraph / fan_out / detached-trace topology) + # already runs these end-to-end, but the fixtures assert + # `metadata.subgraph_name = ` while both the + # Langfuse + OTel observers emit `subgraph_name = `. Resolution queued in coord thread + # `clarify-subgraph-name-semantics`; un-defer (and apply any + # required impl change) once spec picks among the three + # options laid out there. } ) @@ -141,6 +150,64 @@ async def test_langfuse_fixture(fixture_path: Path) -> None: await _run_case(spec) +def _has_topology_constructs(case: Mapping[str, Any]) -> bool: + """Return True when the fixture uses subgraph / fan_out / parallel_branches + constructs. Such fixtures need the full ``adapter.build_graph`` machinery + rather than the simpler per-node hand-rolled path used for the + LLM/prompt-only fixtures.""" + if "subgraph" in case or "subgraphs" in case: + return True + nodes_spec = cast("dict[str, Any]", case.get("nodes") or {}) + for node_spec in nodes_spec.values(): + if not isinstance(node_spec, dict): + continue + node_dict = cast("dict[str, Any]", node_spec) + if "subgraph" in node_dict or "fan_out" in node_dict or "parallel_branches" in node_dict: + return True + return False + + +def _compile_subgraphs(spec: Mapping[str, Any]) -> dict[str, Any]: + """Build any subgraphs declared by the fixture and return a + name→compiled-graph registry the adapter consumes. Mirrors the + OTel-side helper in ``test_observability.py``.""" + subgraph_specs: dict[str, Any] = {} + if "subgraph" in spec: + single = cast("Mapping[str, Any]", spec["subgraph"]) + name = single.get("name") or "subgraph" + subgraph_specs[name] = single + if "subgraphs" in spec: + for k, v in cast("dict[str, Any]", spec["subgraphs"]).items(): + subgraph_specs[k] = v + compiled_subgraphs: dict[str, Any] = {} + for name, sub_spec in subgraph_specs.items(): + sub_built = build_graph(sub_spec, trace=[]) + compiled_subgraphs[name] = sub_built.builder.compile() + return compiled_subgraphs + + +def _resolve_detached_wrapper_names(case: Mapping[str, Any]) -> frozenset[str]: + """Translate fixture-level ``detached_subgraphs`` (a list of SUBGRAPH + IDENTITY names) into the set of WRAPPER NODE names the observer keys + on. The fixture identifies detached subgraphs by their declaration name + in ``subgraphs:`` (e.g., ``long_running_workflow``), but the + LangfuseObserver matches by the wrapper node name in the parent graph + that references the subgraph (e.g., ``dispatch``). + """ + detached_identities = set(cast("list[str]", case.get("detached_subgraphs") or [])) + if not detached_identities: + return frozenset() + nodes_spec = cast("dict[str, Any]", case.get("nodes") or {}) + wrappers: set[str] = set() + for wrapper_name, node_spec in nodes_spec.items(): + if not isinstance(node_spec, dict): + continue + sub_id = cast("dict[str, Any]", node_spec).get("subgraph") + if isinstance(sub_id, str) and sub_id in detached_identities: + wrappers.add(wrapper_name) + return frozenset(wrappers) + + async def _run_case(case: Mapping[str, Any]) -> None: # ---- Mock LLM transport (if the graph has an LLM call) mock_responses = cast("list[dict[str, Any]] | None", case.get("mock_llm")) @@ -167,29 +234,40 @@ async def _run_case(case: Mapping[str, Any]) -> None: prompt_manager = PromptManager(backend) # ---- Graph build - state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) - state_cls = build_state_cls("LangfuseFixtureState", state_fields) - nodes_spec = cast("dict[str, Any]", case["nodes"]) - entry = cast("str", case["entry"]) - edges = cast("list[dict[str, str]]", case["edges"]) - render_variables = cast("dict[str, Any]", case.get("render_variables") or {}) - - builder = GraphBuilder(state_cls) - for node_name, node_spec in nodes_spec.items(): - node_body = _build_node_body( - node_name=node_name, - node_spec=cast("dict[str, Any]", node_spec), - provider=provider, - prompt_manager=prompt_manager, - render_variables=render_variables, - ) - builder.add_node(node_name, node_body) - for edge in edges: - target_raw = edge["to"] - target = END if target_raw == "END" else target_raw - builder.add_edge(edge["from"], target) - builder.set_entry(entry) - graph = builder.compile() + # Two paths: topology fixtures (031/032/033) need the full + # ``adapter.build_graph`` machinery for subgraph / fan_out shapes; + # LLM/prompt fixtures (022/023/024/027) use the simpler hand-rolled + # per-node build that knows about ``calls_llm`` / ``renders_prompt``. + if _has_topology_constructs(case): + subgraphs = _compile_subgraphs(case) + built = build_graph(case, subgraphs=subgraphs, trace=[]) + graph = built.builder.compile() + initial_state_factory = lambda: built.initial_state(case.get("initial_state", {})) # noqa: E731 + else: + state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + state_cls = build_state_cls("LangfuseFixtureState", state_fields) + nodes_spec = cast("dict[str, Any]", case["nodes"]) + entry = cast("str", case["entry"]) + edges = cast("list[dict[str, str]]", case["edges"]) + render_variables = cast("dict[str, Any]", case.get("render_variables") or {}) + + builder = GraphBuilder(state_cls) + for node_name, node_spec in nodes_spec.items(): + node_body = _build_node_body( + node_name=node_name, + node_spec=cast("dict[str, Any]", node_spec), + provider=provider, + prompt_manager=prompt_manager, + render_variables=render_variables, + ) + builder.add_node(node_name, node_body) + for edge in edges: + target_raw = edge["to"] + target = END if target_raw == "END" else target_raw + builder.add_edge(edge["from"], target) + builder.set_entry(entry) + graph = builder.compile() + initial_state_factory = graph.state_cls # ---- Observer observer_cfg = cast("dict[str, Any]", case.get("langfuse_observer") or {}) @@ -200,29 +278,39 @@ async def _run_case(case: Mapping[str, Any]) -> None: observer_kwargs["disable_llm_spans"] = bool(observer_cfg["disable_llm_spans"]) if "payload_byte_cap" in observer_cfg: observer_kwargs["payload_byte_cap"] = int(observer_cfg["payload_byte_cap"]) + detached_subgraphs = _resolve_detached_wrapper_names(case) + if detached_subgraphs: + observer_kwargs["detached_subgraphs"] = detached_subgraphs + detached_fan_outs = frozenset(cast("list[str]", case.get("detached_fan_outs") or [])) + if detached_fan_outs: + observer_kwargs["detached_fan_outs"] = detached_fan_outs client = InMemoryLangfuseClient() observer = LangfuseObserver(client=client, **observer_kwargs) graph.attach_observer(observer) # ---- Run - initial_state_cls = graph.state_cls correlation_id = cast("str | None", case.get("caller_correlation_id")) invoke_kwargs: dict[str, Any] = {} if correlation_id is not None: invoke_kwargs["correlation_id"] = correlation_id - await graph.invoke(initial_state_cls(), **invoke_kwargs) + await graph.invoke(initial_state_factory(), **invoke_kwargs) await graph.drain() if provider is not None: await provider.aclose() # ---- Assert + # Single-trace fixtures use ``langfuse_trace:``; detached / multi-trace + # fixtures use ``langfuse_traces:`` (a list). Branch on which the + # fixture supplies. expected = cast("dict[str, Any]", case["expected"]) - expected_trace = cast("dict[str, Any]", expected["langfuse_trace"]) - assert len(client.traces) == 1, f"expected exactly one Trace, got {len(client.traces)}" - trace = next(iter(client.traces.values())) - _assert_trace( - trace, expected_trace, expected_invariants=cast("dict[str, Any]", expected.get("invariants") or {}) - ) + expected_invariants = cast("dict[str, Any]", expected.get("invariants") or {}) + if "langfuse_traces" in expected: + _assert_multi_traces(client, expected, expected_invariants) + else: + expected_trace = cast("dict[str, Any]", expected["langfuse_trace"]) + assert len(client.traces) == 1, f"expected exactly one Trace, got {len(client.traces)}" + trace = next(iter(client.traces.values())) + _assert_trace(trace, expected_trace, expected_invariants=expected_invariants) def _resolve_llm_model(case: Mapping[str, Any]) -> str: @@ -345,6 +433,117 @@ def _runtime_config_from_spec(config_spec: dict[str, Any] | None) -> RuntimeConf # --------------------------------------------------------------------------- +def _assert_multi_traces( + client: InMemoryLangfuseClient, + expected: dict[str, Any], + expected_invariants: dict[str, Any], +) -> None: + """Detached-trace fixtures (033) span multiple Traces. The expected + block's ``langfuse_traces:`` list names the parent Trace explicitly; + the additional detached Traces are either fully enumerated (subgraph + case) or counted via ``detached_trace_count`` (fan-out case where + only the parent's shape is asserted explicitly and the per-instance + Traces share an identical shape). + """ + expected_traces = cast("list[dict[str, Any]]", expected.get("langfuse_traces") or []) + detached_trace_count = cast("int | None", expected.get("detached_trace_count")) + # If the fixture enumerates all Traces explicitly, the actual count + # MUST match. Otherwise, ``detached_trace_count`` indicates how many + # additional traces beyond the enumerated parent the fixture expects. + expected_total = ( + len(expected_traces) + detached_trace_count + if detached_trace_count is not None + else len(expected_traces) + ) + assert len(client.traces) == expected_total, ( + f"expected {expected_total} Traces, got {len(client.traces)}: " + f"{[t.name for t in client.traces.values()]}" + ) + + # Match each enumerated expected Trace to an actual Trace by name + # compatibility (literal match, or wildcard ``<...>`` placeholder) + # then by root-observation count. Tracks consumed traces so two + # expected entries can't bind to the same actual one. + consumed: set[str] = set() + for exp in expected_traces: + exp_name = cast("str", exp.get("name") or "") + is_wildcard = exp_name.startswith("<") and exp_name.endswith(">") + expected_obs_count = len(cast("list[Any]", exp.get("observations") or [])) + candidates = [ + t for t in client.traces.values() if t.id not in consumed and (is_wildcard or t.name == exp_name) + ] + # Prefer candidates whose root-observation count matches the + # expected structure; the fan-out case has multiple traces of + # the same name where the parent is the only one with a + # populated observation tree. + matching = [t for t in candidates if len(t.children_of(None)) == expected_obs_count] + assert matching or candidates, ( + f"no Trace matches expected name={exp_name!r} (consumed={sorted(consumed)})" + ) + trace = matching[0] if matching else candidates[0] + consumed.add(trace.id) + _assert_trace(trace, exp, expected_invariants={}) + + # Invariants that span multiple Traces. + if expected_invariants.get("distinct_trace_ids"): + trace_ids = {t.id for t in client.traces.values()} + assert len(trace_ids) == len(client.traces), f"trace ids not all distinct: {sorted(trace_ids)}" + if expected_invariants.get("all_instance_trace_ids_distinct"): + trace_ids = {t.id for t in client.traces.values()} + assert len(trace_ids) == len(client.traces), ( + f"instance trace ids not all distinct: {sorted(trace_ids)}" + ) + expected_child_count = cast( + "int | None", expected_invariants.get("dispatch_detached_child_trace_id_count") + ) + if expected_child_count is not None: + # Find the parent-trace observation whose metadata carries + # detached_child_trace_ids; assert its length. + found = False + for trace in client.traces.values(): + for obs in trace.observations: + child_ids_raw = obs.metadata.get("detached_child_trace_ids") + if isinstance(child_ids_raw, list): + child_ids = cast("list[Any]", child_ids_raw) + assert len(child_ids) == expected_child_count, ( + f"observation {obs.name!r} detached_child_trace_ids length: " + f"expected {expected_child_count}, got {len(child_ids)}" + ) + found = True + assert found, "no observation carried metadata.detached_child_trace_ids" + if expected_invariants.get("correlation_id_consistent_across_traces"): + correlation_ids = { + cast("str | None", t.metadata.get("correlation_id")) for t in client.traces.values() + } + correlation_ids.discard(None) + if len(correlation_ids) > 1: + sorted_ids = sorted(c for c in correlation_ids if c is not None) + raise AssertionError(f"correlation_id not consistent across Traces: {sorted_ids}") + if expected_invariants.get("no_instance_spans_in_parent_trace"): + # The parent Trace is the one named in expected_traces (singular). + if len(expected_traces) == 1: + parent_name = cast("str", expected_traces[0]["name"]) + parents = [t for t in client.traces.values() if t.name == parent_name] + # If multiple Traces share the parent name (fan-out case where + # detached per-instance Traces inherit the fan-out node name), + # the actual parent is the one with a non-empty observation + # tree. Per-instance Traces have their own subtree under their + # own dispatch observation; their leaf shapes are different + # from the parent's flat fan-out node observation. + for t in parents: + # Parent observations must be limited to the fan-out + # dispatch (no leaked per-instance inner-node names). + root_obs = t.children_of(None) + for obs in root_obs: + if obs.name != parent_name: + # If we got here, an inner-node observation + # leaked into the parent Trace. + raise AssertionError( + f"unexpected observation {obs.name!r} in parent Trace {t.id!r}; " + f"parent should only contain the fan-out dispatch observation" + ) + + def _assert_trace( trace: LangfuseTrace, expected: dict[str, Any], @@ -484,6 +683,13 @@ def _parse_messages(value: Any) -> list[dict[str, Any]]: raise AssertionError(f"input attribute did not parse as a message list: {value!r}") +def _is_placeholder(value: Any) -> bool: + """```` literals in fixtures are wildcards: they assert + shape (non-empty string) without binding a specific value. Cross- + occurrence consistency lives in the ``invariants:`` block.""" + return isinstance(value, str) and value.startswith("<") and value.endswith(">") + + def _assert_metadata_subset( label: str, actual: Mapping[str, Any], @@ -495,12 +701,9 @@ def _assert_metadata_subset( for key, expected_value in expected.items(): assert key in actual, f"{label}: missing key {key!r}; got keys {sorted(actual)}" actual_value = actual[key] - if isinstance(expected_value, str) and ( - expected_value == "" or expected_value.startswith(" 0, ( - f"{label}.{key}: expected placeholder match, got {actual_value!r}" + f"{label}.{key}: expected placeholder {expected_value!r} match, got {actual_value!r}" ) continue if isinstance(expected_value, dict) and isinstance(actual_value, dict): @@ -510,6 +713,24 @@ def _assert_metadata_subset( cast("Mapping[str, Any]", expected_value), ) continue + if isinstance(expected_value, list) and isinstance(actual_value, list): + # List values may contain placeholders element-by-element + # (e.g., ``detached_child_trace_ids: [""]``). + # Length must match; each element matches by placeholder + # rules or strict equality. + expected_list = cast("list[Any]", expected_value) + actual_list = cast("list[Any]", actual_value) + assert len(actual_list) == len(expected_list), ( + f"{label}.{key} length: expected {len(expected_list)}, got {len(actual_list)}" + ) + for i, (exp_el, act_el) in enumerate(zip(expected_list, actual_list, strict=True)): + if _is_placeholder(exp_el): + assert isinstance(act_el, str) and len(act_el) > 0, ( + f"{label}.{key}[{i}]: expected placeholder {exp_el!r}, got {act_el!r}" + ) + else: + assert act_el == exp_el, f"{label}.{key}[{i}]: expected {exp_el!r}, got {act_el!r}" + continue assert actual_value == expected_value, ( f"{label}.{key}: expected {expected_value!r}, got {actual_value!r}" ) @@ -518,7 +739,16 @@ def _assert_metadata_subset( def _assert_string_or_placeholder(label: str, actual: str | None, expected: Any) -> None: if expected is None: return - if isinstance(expected, str) and (expected == "" or expected == ""): + # Any ```` form is treated as a wildcard that requires + # a non-empty string. Cross-occurrence consistency (e.g., the same + # ```` appearing on multiple Traces must resolve to the + # same value) is enforced by the fixture's ``invariants:`` block + # (``correlation_id_consistent_across_traces``, etc.), not by + # placeholder-binding here. Known shape-only placeholders: + # ````, ````, ````, + # ````, ````, + # ````. + if isinstance(expected, str) and expected.startswith("<") and expected.endswith(">"): assert isinstance(actual, str) and len(actual) > 0, ( f"{label}: expected non-empty string, got {actual!r}" ) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 06716f6..023146b 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.9.0" - assert openarmature.__spec_version__ == "0.26.0" + assert openarmature.__spec_version__ == "0.26.1" def test_spec_version_matches_pyproject() -> None: From a8d155240271dac4f75988291596524cc807e110 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 18:22:21 -0700 Subject: [PATCH 2/4] Wire subgraph_identity through engine + observers (Option A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the coord-thread `clarify-subgraph-name-semantics` resolution (msg 02): `metadata.subgraph_name` carries the compiled subgraph's identity rather than the wrapper node name. Engine surface: - `SubgraphNode` gains `subgraph_identity: str | None = None` (optional, BC-preserving). - `FanOutConfig` gains `subgraph_identity: str | None = None` for the symmetric fan-out case; threaded through `GraphBuilder.add_fan_out_node`. - `_InvocationContext` gains `subgraph_identities: tuple[str | None, ...]`, parallel to `namespace_prefix` — index `i` is the identity of the wrapper at `namespace_prefix[i]`. `descend_into_subgraph` / `descend_into_fan_out_instance` / `descend_into_parallel_branch` extend the chain. - `NodeEvent.subgraph_identities: tuple[str | None, ...] = ()` carries the chain at event-emission time so observers can read the identity at each wrapper depth. Observer surface: - Langfuse `LangfuseObserver` and OTel `OTelObserver` both gain `_subgraph_identity_at(event, depth)` and emit identity (or empty string when None) as `metadata.subgraph_name` / `openarmature. subgraph.name` on wrapper observations / spans, per-instance fan-out dispatch observations, and detached-trace wrapper observations. - Detached subgraph's wrapper observation name now also uses the identity (falling back to the wrapper node name when None). - Langfuse wrapper observation synthesis now carries `namespace`, `step`, and `attempt_index` metadata (step from the first inner event; attempt_index hardcoded 0 since wrappers don't retry). Conformance adapter: - `_TracingSubgraphNode` and `_add_fan_out_node` set `subgraph_identity` from the fixture's `subgraphs:` block key when compiling, so every fixture-built subgraph carries its declared identity. Updates the pre-existing OTel-side test on fixture 002 to assert `openarmature.subgraph.name == "inner"` (identity) rather than `"outer_sub"` (wrapper node name) — the prior assertion was inconsistent with Option A semantics; the spec-side fixture 002 YAML doesn't assert this attribute, so the python test was a stricter local check that needed correction. Langfuse fixtures 031/032/033 stay deferred pending two additional spec/fixture ambiguities surfaced while wiring this up: (a) the `step` value on `outer_out` (fixture 031 says 2, but graph-engine §6 says inner-subgraph node executions increment the same counter so the engine emits 3) and (b) whether the `namespace` metadata on detached-trace inner observations should be rewritten to use the subgraph identity at the wrapper position. Both queued in coord thread `clarify-subgraph-name-semantics` msg 03. --- src/openarmature/graph/builder.py | 2 + src/openarmature/graph/compiled.py | 3 + src/openarmature/graph/events.py | 14 ++++ src/openarmature/graph/fan_out.py | 10 +++ src/openarmature/graph/observer.py | 24 +++++++ src/openarmature/graph/subgraph.py | 12 ++++ .../observability/langfuse/observer.py | 64 +++++++++++++++++-- .../observability/otel/observer.py | 30 +++++++-- tests/conformance/adapter.py | 2 + tests/conformance/test_observability.py | 7 +- .../test_observability_langfuse.py | 27 +++++--- 11 files changed, 175 insertions(+), 20 deletions(-) diff --git a/src/openarmature/graph/builder.py b/src/openarmature/graph/builder.py index f65ab6b..d3cc18c 100644 --- a/src/openarmature/graph/builder.py +++ b/src/openarmature/graph/builder.py @@ -148,6 +148,7 @@ def add_fan_out_node[ChildT: State]( instance_middleware: Iterable[Middleware] | None = None, errors_field: str | None = None, middleware: Iterable[Middleware] | None = None, + subgraph_identity: str | None = None, ) -> Self: """Register a fan-out node. @@ -262,6 +263,7 @@ def add_fan_out_node[ChildT: State]( extra_outputs=dict(extra_outputs or {}), instance_middleware=tuple(instance_middleware or ()), errors_field=errors_field, + subgraph_identity=subgraph_identity, ) # FanOutNode satisfies the Node[StateT] structural protocol (run # returns a partial update; name and middleware are present), diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index 31e6aa7..63ff899 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -1987,6 +1987,7 @@ def _dispatch_started( fan_out_index=context.fan_out_index, fan_out_config=fan_out_config, branch_name=current_branch_name(), + subgraph_identities=context.subgraph_identities, ), ) @@ -2020,6 +2021,7 @@ def _dispatch_completed( fan_out_index=context.fan_out_index, fan_out_config=fan_out_config, branch_name=current_branch_name(), + subgraph_identities=context.subgraph_identities, ), ) @@ -2202,5 +2204,6 @@ async def _maybe_save_checkpoint( parent_states=context.parent_states_prefix, attempt_index=attempt_index, fan_out_index=None, + subgraph_identities=context.subgraph_identities, ), ) diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 9ca1a52..795fd8e 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -191,6 +191,20 @@ class NodeEvent: # simultaneously when a branch's subgraph contains a fan-out # (and vice versa). branch_name: str | None = None + # Per observability §5.3 + the coord-thread + # ``clarify-subgraph-name-semantics`` resolution: chain of + # compiled-subgraph identities parallel to the wrapper-depth + # positions of ``namespace``. Index ``i`` is the identity for + # the wrapper at ``namespace[i]`` (or ``None`` when that + # wrapper has no tracked identity); chain length equals the + # depth of wrapper nesting (always ``< len(namespace)`` since + # the last element of ``namespace`` is the current node, not + # a wrapper). Observers read by depth and emit it as + # ``observation.metadata.subgraph_name`` (Langfuse) / + # ``openarmature.subgraph.name`` (OTel), falling back to the + # empty string when ``None`` per §5.3's "if the implementation + # tracks one" clause. + subgraph_identities: tuple[str | None, ...] = () __all__ = ["FanOutEventConfig", "NodeEvent"] diff --git a/src/openarmature/graph/fan_out.py b/src/openarmature/graph/fan_out.py index eb0ff70..de236c6 100644 --- a/src/openarmature/graph/fan_out.py +++ b/src/openarmature/graph/fan_out.py @@ -80,6 +80,15 @@ class FanOutConfig: extra_outputs: Mapping[str, str] = field(default_factory=dict[str, str]) instance_middleware: tuple[Middleware, ...] = () errors_field: str | None = None + # The identity of the compiled inner subgraph (the key under + # which the subgraph is declared in a ``subgraphs:`` registry). + # Threaded onto every per-instance event so observers can emit + # ``observation.metadata.subgraph_name`` on each per-instance + # dispatch observation (Langfuse) / + # ``openarmature.subgraph.name`` on the corresponding span + # (OTel). Optional and BC-preserving — direct callers that don't + # supply it get the empty-string fallback per observability §5.3. + subgraph_identity: str | None = None @dataclass(frozen=True) @@ -271,6 +280,7 @@ async def run_instance(idx: int, instance_state: ChildT) -> Mapping[str, Any]: parent_state=state, sub_attached=tuple(cfg.subgraph._attached_observers), # noqa: SLF001 fan_out_index=idx, + subgraph_identity=cfg.subgraph_identity, ) async def innermost(s: ChildT) -> Mapping[str, Any]: diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index 31d2ea1..efaae55 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -336,6 +336,18 @@ class _InvocationContext: step_counter: list[int] = field(default_factory=lambda: [0]) namespace_prefix: tuple[str, ...] = () parent_states_prefix: tuple[State, ...] = () + # Per observability §5.3 + the coord-thread `clarify-subgraph-name- + # semantics` resolution. Parallel to ``namespace_prefix`` — index + # ``i`` is the compiled-subgraph identity for the wrapper at + # ``namespace_prefix[i]``, or ``None`` for wrappers constructed + # without an identity. Used by observers to emit + # ``metadata.subgraph_name`` (Langfuse) and + # ``openarmature.subgraph.name`` (OTel) on the wrapper observation + # / span at each depth. The chain shape lets nested subgraphs + # carry distinct identities at distinct depths even though + # v0.10.0's conformance fixtures only exercise single-level + # nesting. + subgraph_identities: tuple[str | None, ...] = () # Per pipeline-utilities §9 + graph-engine §6: nodes inside a # fan-out instance fire events tagged with the instance's 0-based # index. Set when descending into a fan-out instance, inherited @@ -426,6 +438,8 @@ def descend_into_subgraph( subgraph_node_name: str, parent_state: State, sub_attached: tuple[SubscribedObserver, ...], + *, + subgraph_identity: str | None = None, ) -> _InvocationContext: """Build the context for a subgraph-as-node call. @@ -447,6 +461,7 @@ def descend_into_subgraph( step_counter=self.step_counter, namespace_prefix=self.namespace_prefix + (subgraph_node_name,), parent_states_prefix=self.parent_states_prefix + (parent_state,), + subgraph_identities=self.subgraph_identities + (subgraph_identity,), fan_out_index=self.fan_out_index, invocation_id=self.invocation_id, correlation_id=self.correlation_id, @@ -466,6 +481,8 @@ def descend_into_fan_out_instance( parent_state: State, sub_attached: tuple[SubscribedObserver, ...], fan_out_index: int, + *, + subgraph_identity: str | None = None, ) -> _InvocationContext: """Build the context for one fan-out instance's subgraph invocation. @@ -491,6 +508,7 @@ def descend_into_fan_out_instance( step_counter=self.step_counter, namespace_prefix=self.namespace_prefix + (fan_out_node_name,), parent_states_prefix=self.parent_states_prefix + (parent_state,), + subgraph_identities=self.subgraph_identities + (subgraph_identity,), fan_out_index=fan_out_index, invocation_id=self.invocation_id, correlation_id=self.correlation_id, @@ -541,6 +559,12 @@ def descend_into_parallel_branch( step_counter=self.step_counter, namespace_prefix=self.namespace_prefix + (parallel_branches_node_name,), parent_states_prefix=self.parent_states_prefix + (parent_state,), + # Parallel-branches don't reify a single inner subgraph + # identity at the wrapper position — each branch can hold a + # different subgraph — so we extend the chain with ``None`` + # at this depth. Per-branch identity handling (if ever + # needed) is a future addition. + subgraph_identities=self.subgraph_identities + (None,), fan_out_index=self.fan_out_index, invocation_id=self.invocation_id, correlation_id=self.correlation_id, diff --git a/src/openarmature/graph/subgraph.py b/src/openarmature/graph/subgraph.py index 63e8ad9..b0a23a1 100644 --- a/src/openarmature/graph/subgraph.py +++ b/src/openarmature/graph/subgraph.py @@ -50,6 +50,17 @@ class SubgraphNode[ParentT: State, ChildT: State]: default_factory=FieldNameMatching[ParentT, ChildT] ) middleware: tuple[Middleware, ...] = field(default_factory=tuple[Middleware, ...]) + # The compiled subgraph's identity (the registry key under which + # the subgraph is declared, distinct from the wrapper node's + # ``name`` in the parent graph). Optional and BC-preserving: + # callers that don't pass it get an empty string emitted as the + # observability §5.3 ``subgraph_name`` attribute (matching the + # spec's "if the implementation tracks one" fallback). Setting + # it lets dashboards filter or aggregate across observations from + # the same compiled subgraph wrapped under different node names + # (e.g., a ``validator`` subgraph used as both ``validate_input`` + # and ``validate_output``). + subgraph_identity: str | None = None async def run( self, @@ -116,6 +127,7 @@ async def run( subgraph_node_name=self.name, parent_state=state, sub_attached=tuple(self.compiled._attached_observers), + subgraph_identity=self.subgraph_identity, ) sub_final = await self.compiled._invoke(sub_initial, child_context) return self.projection.project_out(sub_final, state, self.compiled.state_cls) diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 9756604..8a2ddc1 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -80,6 +80,28 @@ def _empty_str_frozenset() -> frozenset[str]: return frozenset() +def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: + """Return the compiled-subgraph identity for the wrapper at the + given 1-based namespace depth, or the empty string when no + identity is tracked at that depth. + + Per observability §5.3 + the coord-thread + ``clarify-subgraph-name-semantics`` resolution: the empty-string + fallback matches the spec's "if the implementation tracks one" + clause for implementations / direct ``SubgraphNode(...)`` callers + that don't wire an identity through. Conformance fixtures + 031/032/033 lock identity as the required value; the empty-string + path keeps direct callers conformant with §5.3 but failing those + fixtures. + """ + idx = depth - 1 + if 0 <= idx < len(event.subgraph_identities): + identity = event.subgraph_identities[idx] + if identity is not None: + return identity + return "" + + @dataclass class _InvState: """Per-invocation state, isolated by invocation_id. @@ -453,7 +475,7 @@ def _sync_subgraph_observations( # configured detached_subgraphs name → mint a fresh # detached Trace + open the dispatch observation in it. if depth == 1 and prefix[0] in self.detached_subgraphs: - self._open_detached_subgraph_trace(inv_state, correlation_id, prefix) + self._open_detached_subgraph_trace(inv_state, correlation_id, prefix, event) continue # Detached fan-out: the fan-out instance gets its own # Trace per spec §8.5. The fan-out node's Span observation @@ -478,13 +500,14 @@ def _sync_subgraph_observations( self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event) continue # Plain non-detached subgraph dispatch. - self._open_subgraph_observation(inv_state, correlation_id, prefix) + self._open_subgraph_observation(inv_state, correlation_id, prefix, event) def _open_subgraph_observation( self, inv_state: _InvState, correlation_id: str | None, prefix: tuple[str, ...], + event: NodeEvent, ) -> None: # Parent is the nearest enclosing subgraph dispatch (if any), # else None (the Trace is the implicit parent for top-level @@ -496,7 +519,18 @@ def _open_subgraph_observation( if sg is not None: parent_observation_id = sg.handle.id break - metadata: dict[str, Any] = {"subgraph_name": prefix[-1]} + # Subgraph wrappers don't dispatch their own events, so the + # synthetic wrapper observation inherits its scalar metadata + # from the FIRST inner event that triggered the synthesis. + # ``attempt_index`` is hardcoded to 0: the wrapper has no + # engine-managed retry counter of its own (inner nodes own + # their own attempt_index independently). + metadata: dict[str, Any] = { + "namespace": list(prefix), + "step": event.step, + "attempt_index": 0, + "subgraph_name": _subgraph_identity_at(event, len(prefix)), + } if correlation_id is not None: metadata["correlation_id"] = correlation_id handle = self.client.span( @@ -519,9 +553,16 @@ def _open_fan_out_instance_dispatch_observation( fan_out_open = self._find_fan_out_node_observation(inv_state, prefix) parent_observation_id = fan_out_open.handle.id if fan_out_open is not None else None parent_node_name = inv_state.fan_out_parent_node_name.get(prefix, prefix[-1]) + # Per-instance dispatch is synthesized from the first inner + # event inside the instance subtree; inherit scalar metadata + # from that event (same pattern as ``_open_subgraph_observation``). metadata: dict[str, Any] = { + "namespace": list(prefix), + "step": event.step, + "attempt_index": 0, "fan_out_parent_node_name": parent_node_name, "fan_out_index": event.fan_out_index, + "subgraph_name": _subgraph_identity_at(event, len(prefix)), } if correlation_id is not None: metadata["correlation_id"] = correlation_id @@ -539,6 +580,7 @@ def _open_detached_subgraph_trace( inv_state: _InvState, correlation_id: str | None, prefix: tuple[str, ...], + event: NodeEvent, ) -> None: # Mint a fresh Trace for the detached subtree. The main Trace's # dispatch observation surfaces the link via @@ -604,16 +646,26 @@ def _open_detached_subgraph_trace( detached_metadata: dict[str, Any] = {"detached_from_invocation_id": inv_state.trace_id} if correlation_id is not None: detached_metadata["correlation_id"] = correlation_id - self.client.trace(id=detached_trace_id, name=prefix[-1], metadata=detached_metadata) + identity = _subgraph_identity_at(event, len(prefix)) + # The detached trace's wrapper observation IS the migrated + # SubgraphNode wrapper. Per the resolution in coord thread + # ``clarify-subgraph-name-semantics`` and fixture 033's + # expected shape, its name and ``metadata.subgraph_name`` use + # the compiled-subgraph identity (e.g., ``"long_running_workflow"``) + # rather than the wrapper node name. Falls back to the wrapper + # node name when identity is empty (observability §5.3's + # "if the implementation tracks one" clause). + wrapper_obs_name = identity or prefix[-1] + self.client.trace(id=detached_trace_id, name=wrapper_obs_name, metadata=detached_metadata) dispatch_metadata: dict[str, Any] = { - "subgraph_name": prefix[-1], + "subgraph_name": identity, "detached": True, } if correlation_id is not None: dispatch_metadata["correlation_id"] = correlation_id handle = self.client.span( trace_id=detached_trace_id, - name=prefix[-1], + name=wrapper_obs_name, metadata=dispatch_metadata, parent_observation_id=None, ) diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 6fa7d46..63f2233 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -148,6 +148,25 @@ def _read_spec_version() -> str: return __spec_version__ +def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: + """Return the compiled-subgraph identity for the wrapper at the + given 1-based namespace depth, or the empty string when no + identity is tracked at that depth. + + Per observability §5.3 + the coord-thread + ``clarify-subgraph-name-semantics`` resolution: empty-string + fallback matches the spec's "if the implementation tracks one" + clause for callers using ``SubgraphNode(name=..., compiled=...)`` + without supplying ``subgraph_identity``. + """ + idx = depth - 1 + if 0 <= idx < len(event.subgraph_identities): + identity = event.subgraph_identities[idx] + if identity is not None: + return identity + return "" + + def _empty_str_frozenset() -> frozenset[str]: """Typed empty frozenset factory for ``detached_subgraphs`` / ``detached_fan_outs`` defaults.""" @@ -1033,7 +1052,7 @@ def _sync_subgraph_spans( # If this prefix's first segment is configured as a # detached subgraph, mint a fresh trace. if depth == 1 and prefix[0] in self.detached_subgraphs: - self._open_detached_subgraph_root(inv_state, invocation_id, correlation_id, prefix) + self._open_detached_subgraph_root(inv_state, invocation_id, correlation_id, prefix, event) continue # If this is a fan-out instance namespace (event.fan_out_index # populated, prefix == namespace[:1]), and the fan-out @@ -1057,7 +1076,7 @@ def _sync_subgraph_spans( ): self._open_fan_out_instance_dispatch_span(inv_state, correlation_id, prefix, event) continue - self._open_subgraph_span(inv_state, invocation_id, correlation_id, prefix) + self._open_subgraph_span(inv_state, invocation_id, correlation_id, prefix, event) def _open_subgraph_span( self, @@ -1065,6 +1084,7 @@ def _open_subgraph_span( invocation_id: str, correlation_id: str | None, prefix: tuple[str, ...], + event: NodeEvent, ) -> None: """Open a synthetic subgraph dispatch span for the given namespace prefix. Parent is the next-outer subgraph span (or @@ -1088,7 +1108,7 @@ def _open_subgraph_span( parent_ctx = set_span_in_context(inv.span) attrs: dict[str, Any] = { "openarmature.node.name": prefix[-1], - "openarmature.subgraph.name": prefix[-1], + "openarmature.subgraph.name": _subgraph_identity_at(event, len(prefix)), } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id @@ -1114,6 +1134,7 @@ def _open_detached_subgraph_root( invocation_id: str, correlation_id: str | None, prefix: tuple[str, ...], + event: NodeEvent, ) -> None: """Mint a fresh trace for a detached subgraph entry. The detached root span lives in the new trace; the parent trace's @@ -1141,7 +1162,7 @@ def _open_detached_subgraph_root( parent_ctx_for_dispatch = set_span_in_context(inv.span) attrs_parent: dict[str, Any] = { "openarmature.node.name": prefix[-1], - "openarmature.subgraph.name": prefix[-1], + "openarmature.subgraph.name": _subgraph_identity_at(event, len(prefix)), } if correlation_id is not None: attrs_parent["openarmature.correlation_id"] = correlation_id @@ -1264,6 +1285,7 @@ def _open_fan_out_instance_dispatch_span( "openarmature.node.name": prefix[-1], "openarmature.fan_out.parent_node_name": parent_node_name, "openarmature.node.fan_out_index": event.fan_out_index, + "openarmature.subgraph.name": _subgraph_identity_at(event, len(prefix)), } if correlation_id is not None: attrs["openarmature.correlation_id"] = correlation_id diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py index bbdadc0..de3c92d 100644 --- a/tests/conformance/adapter.py +++ b/tests/conformance/adapter.py @@ -669,6 +669,7 @@ def build_graph( projection=projection, trace_list=trace, middleware=per_node_mw, + subgraph_identity=sub_name, ) continue if "fan_out" in node_spec: @@ -936,6 +937,7 @@ def _add_fan_out_node( extra_outputs=cfg.get("extra_outputs"), errors_field=cfg.get("errors_field"), instance_middleware=instance_middleware, + subgraph_identity=sub_name, ) # Swap the registered FanOutNode for a tracing variant so the diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 9c537ac..9069bd0 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -324,7 +324,12 @@ async def _run_fixture_002(spec: Mapping[str, Any]) -> None: assert sub_dispatch.context is not None sub_dispatch_id = sub_dispatch.context.span_id sub_dispatch_attrs = dict(sub_dispatch.attributes or {}) - assert sub_dispatch_attrs.get("openarmature.subgraph.name") == "outer_sub" + # Per observability §5.3 + coord thread `clarify-subgraph-name- + # semantics` Option A: `openarmature.subgraph.name` carries the + # compiled subgraph's identity, NOT the wrapper node name. The + # conformance adapter sets ``subgraph_identity = "inner"`` when + # compiling the fixture's ``subgraph: { name: inner }`` block. + assert sub_dispatch_attrs.get("openarmature.subgraph.name") == "inner" # Inner-node spans parent under the subgraph dispatch span and # carry the nested namespace. diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 49d880d..584d6b2 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -46,15 +46,24 @@ "022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage", - # 031 / 032 / 033 (proposal 0035, spec v0.26.1): harness - # extension below (subgraph / fan_out / detached-trace topology) - # already runs these end-to-end, but the fixtures assert - # `metadata.subgraph_name = ` while both the - # Langfuse + OTel observers emit `subgraph_name = `. Resolution queued in coord thread - # `clarify-subgraph-name-semantics`; un-defer (and apply any - # required impl change) once spec picks among the three - # options laid out there. + # 031 / 032 / 033 — proposal 0035 (spec v0.26.1). The + # subgraph_identity wiring (per coord thread + # `clarify-subgraph-name-semantics` msg 02 — Option A) is + # landed: SubgraphNode.subgraph_identity / FanOutConfig. + # subgraph_identity flow through NodeEvent.subgraph_identities + # to observer-side metadata.subgraph_name emission. Two + # remaining spec/fixture ambiguities block fixture activation: + # (1) ``step`` semantics on the wrapper synth observation vs. + # ``outer_out``: fixture 031 expects ``outer_out`` at step 2 + # but graph-engine §6 says "subgraph-internal node executions + # increment the same counter" so the python engine emits + # step 3 (outer_in=0, inner_x=1, inner_y=2, outer_out=3). + # (2) ``namespace`` rewrite for observations inside a + # detached trace: fixture 033 case 1 expects + # ``namespace: ["long_running_workflow", "step"]`` (using + # subgraph identity for the wrapper component) but the + # engine's event carries the wrapper node name (``"dispatch"``). + # Both queued for spec input via a follow-up coord thread. } ) From d2cc2a9faa0b1267d555eb390eeabb7eb3dfba68 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 18:36:24 -0700 Subject: [PATCH 3/4] Address PR 85 review: header tweak + two coverage tests Updates the module-header comment in tests/conformance/test_observability_langfuse.py to clarify that the harness *infrastructure* supports the 031/032/033 topology shapes while activation is currently deferred. Drops the forward-reference to fixture 027 in an internal comment that referenced fixtures this module doesn't currently run. Adds two focused unit tests for the observer behavior changes that were only exercised by deferred conformance fixtures: - test_entry_node_resolves_to_wrapper_when_entry_is_subgraph pins the entry_node correctness fix: when the outer entry IS a SubgraphNode, the first event arrives from inside the subgraph (event.namespace = (wrapper, inner), node_name = inner). Both trace.name and trace.metadata.entry_node must resolve to the wrapper, not the inner node. - test_detached_subgraph_subgraph_name_placement pins the audit fix from coord-thread msg 07: in detached mode the wrapper role migrates to the detached trace. The parent trace's link observation must not carry subgraph_name; the detached trace's dispatch observation must. Both tests sit alongside the existing dispatch-synthesis tests. --- .../test_observability_langfuse.py | 19 ++++-- tests/unit/test_observability_langfuse.py | 60 +++++++++++++++++++ 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 584d6b2..c627f44 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -1,8 +1,15 @@ -# Spec mapping (observability §8): drives the Langfuse mapping fixtures -# (022/023/024 basic/generation/prompt-linkage, plus 031/032/033 graph -# topology) against the in-memory LangfuseObserver client. Sibling of -# test_observability.py (OTel mapping); shares no harness state with -# the OTel side — each fixture builds its own graph + observer instance. +# Spec mapping (observability §8): drives the Langfuse mapping +# fixtures (022 basic-trace, 023 generation-rendering, 024 +# prompt-linkage) against the in-memory LangfuseObserver client. +# Sibling of test_observability.py (OTel mapping); shares no harness +# state with the OTel side — each fixture builds its own graph + +# observer instance. +# +# The harness also supports the graph-topology shapes used by +# 031/032/033 (subgraph / fan-out / detached-trace) via the +# cross-capability adapter.build_graph helper, but activation of +# those three fixtures is currently deferred — see the +# `_LANGFUSE_FIXTURES` frozenset comment for the gating questions. """Run spec observability Langfuse conformance fixtures.""" @@ -245,7 +252,7 @@ async def _run_case(case: Mapping[str, Any]) -> None: # ---- Graph build # Two paths: topology fixtures (031/032/033) need the full # ``adapter.build_graph`` machinery for subgraph / fan_out shapes; - # LLM/prompt fixtures (022/023/024/027) use the simpler hand-rolled + # LLM/prompt fixtures (022/023/024) use the simpler hand-rolled # per-node build that knows about ``calls_llm`` / ``renders_prompt``. if _has_topology_constructs(case): subgraphs = _compile_subgraphs(case) diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 7fe0ad9..0a7249b 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -196,6 +196,32 @@ def _find_observation(trace: LangfuseTrace, name: str) -> LangfuseObservation: raise AssertionError(f"observation {name!r} not in trace {trace.id!r}") +async def test_entry_node_resolves_to_wrapper_when_entry_is_subgraph() -> None: + # When the outer entry IS a SubgraphNode, the first event the + # observer sees comes from inside the subgraph + # (event.namespace = (wrapper, inner), event.node_name = inner). + # `entry_node` and trace.name MUST resolve to the wrapper node + # name (event.namespace[0]), not the inner node name. + inner = ( + GraphBuilder(_S) + .add_node("inner_a", lambda _s: _record("inner_a")) + .add_edge("inner_a", END) + .set_entry("inner_a") + .compile() + ) + parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile() + graph, client, _ = _attach(parent) + + await graph.invoke(_S()) + await graph.drain() + + trace = next(iter(client.traces.values())) + assert trace.name == "sub", f"trace name should be the wrapper, got {trace.name!r}" + assert trace.metadata.get("entry_node") == "sub", ( + f"entry_node should be the wrapper, got {trace.metadata.get('entry_node')!r}" + ) + + async def test_subgraph_dispatch_observation_parents_inner_node() -> None: inner = ( GraphBuilder(_S) @@ -294,6 +320,40 @@ async def test_detached_subgraph_opens_separate_trace() -> None: assert inner_node.parent_observation_id == detached_dispatch.id +async def test_detached_subgraph_subgraph_name_placement() -> None: + # Per coord thread `discuss-observability-langfuse-mapping` msg 07 + # and the wrapper-role-migration framing: in detached mode the + # wrapper role migrates to the detached trace. The parent trace's + # link observation IS the SubgraphNode span (no wrapper role) and + # MUST NOT carry `subgraph_name`. The detached trace's dispatch + # observation IS the migrated wrapper and MUST carry it. + inner = ( + GraphBuilder(_S) + .add_node("inner_a", lambda _s: _record("inner_a")) + .add_edge("inner_a", END) + .set_entry("inner_a") + .compile() + ) + parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile() + graph, client, _ = _attach_with_detached(parent, detached_subgraphs=frozenset({"sub"})) + + await graph.invoke(_S()) + await graph.drain() + + main = next(t for t in client.traces.values() if "detached_from_invocation_id" not in t.metadata) + detached = next(t for t in client.traces.values() if "detached_from_invocation_id" in t.metadata) + + link_obs = _find_observation(main, "sub") + assert "subgraph_name" not in link_obs.metadata, ( + f"link observation MUST NOT carry subgraph_name; got {link_obs.metadata!r}" + ) + + detached_dispatch = _find_observation(detached, "sub") + assert "subgraph_name" in detached_dispatch.metadata, ( + f"detached dispatch MUST carry subgraph_name; got {detached_dispatch.metadata!r}" + ) + + async def test_detached_fan_out_each_instance_gets_trace() -> None: async def _worker(_s: _WorkerState) -> Any: return {"result": "done"} From dd55ae2924d3c93f49627dc8eeb9a95a5e2b8f38 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 18:50:13 -0700 Subject: [PATCH 4/4] Address PR 85 review round 2: comment + per-trace invariants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clarifies the detached-trace wrapper comment in the Langfuse observer to make the BC-path asymmetry explicit: the observation NAME falls back to the wrapper node name when subgraph_identity is empty (UX choice — better than an empty-string observation name), but metadata.subgraph_name stays empty per §5.3's "empty string when no identity is tracked" contract. This lets dashboard filters on `metadata.subgraph_name == "X"` match only wrappers explicitly registered with `subgraph_identity = "X"`, not every wrapper that happens to be named X. Fixes a latent bug in the multi-trace conformance assertion path: _assert_multi_traces was passing `expected_invariants={}` to _assert_trace, dropping per-trace invariants like correlation_id_consistency. Fixture 033 doesn't currently use any per-trace invariants, so the bug was dormant — any future multi-trace fixture using correlation_id_consistency would silently pass when broken. Adds _PER_TRACE_INVARIANTS to classify invariants; the multi-trace runner filters expected_invariants to that set when delegating per-Trace assertions, leaving cross-trace invariants to _assert_multi_traces. --- .../observability/langfuse/observer.py | 20 ++++++++++++++----- .../test_observability_langfuse.py | 12 ++++++++++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 8a2ddc1..4ea7bae 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -650,11 +650,21 @@ def _open_detached_subgraph_trace( # The detached trace's wrapper observation IS the migrated # SubgraphNode wrapper. Per the resolution in coord thread # ``clarify-subgraph-name-semantics`` and fixture 033's - # expected shape, its name and ``metadata.subgraph_name`` use - # the compiled-subgraph identity (e.g., ``"long_running_workflow"``) - # rather than the wrapper node name. Falls back to the wrapper - # node name when identity is empty (observability §5.3's - # "if the implementation tracks one" clause). + # expected shape, the observation name uses the compiled- + # subgraph identity (e.g., ``"long_running_workflow"``); its + # ``metadata.subgraph_name`` carries the same identity. + # + # When the identity is empty (BC path — ``SubgraphNode`` + # constructed without ``subgraph_identity``), the two + # diverge intentionally: the observation NAME falls back to + # the wrapper node name (an empty observation name is worse + # UX than a wrapper-named one), but ``metadata.subgraph_name`` + # stays empty per §5.3's "empty string when no identity is + # tracked" contract. Filtering on + # ``metadata.subgraph_name == "X"`` then matches only + # wrappers explicitly registered with + # ``subgraph_identity = "X"``, not every wrapper that + # happens to be named ``X``. wrapper_obs_name = identity or prefix[-1] self.client.trace(id=detached_trace_id, name=wrapper_obs_name, metadata=detached_metadata) dispatch_metadata: dict[str, Any] = { diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index c627f44..df958e9 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -449,6 +449,15 @@ def _runtime_config_from_spec(config_spec: dict[str, Any] | None) -> RuntimeConf # --------------------------------------------------------------------------- +# Per-trace invariants — invariants ``_assert_trace`` knows how to +# check on a single Trace. The multi-trace runner filters +# ``expected_invariants`` to this set when delegating per-Trace +# assertions; the rest (``distinct_trace_ids``, +# ``correlation_id_consistent_across_traces``, etc.) stay in +# ``_assert_multi_traces`` as cross-Trace checks. +_PER_TRACE_INVARIANTS = frozenset({"trace_id_equals_invocation_id", "correlation_id_consistency"}) + + def _assert_multi_traces( client: InMemoryLangfuseClient, expected: dict[str, Any], @@ -498,7 +507,8 @@ def _assert_multi_traces( ) trace = matching[0] if matching else candidates[0] consumed.add(trace.id) - _assert_trace(trace, exp, expected_invariants={}) + per_trace_invariants = {k: v for k, v in expected_invariants.items() if k in _PER_TRACE_INVARIANTS} + _assert_trace(trace, exp, expected_invariants=per_trace_invariants) # Invariants that span multiple Traces. if expected_invariants.get("distinct_trace_ids"):