diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index d79bc4e..5960a2f 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -297,6 +297,17 @@ def _handle_completed(self, event: NodeEvent) -> None: span.set_status(Status(StatusCode.ERROR, description=event.error.category)) span.record_exception(event.error) span.set_attribute("openarmature.error.category", event.error.category) + # Per spec §4.2 / fixture 003: the invocation span MUST + # end with ERROR status when any child node errors. OTel + # doesn't auto-propagate child status to parents — we set + # it explicitly here. The OTel SDK's status-precedence + # rule preserves ERROR through any subsequent + # ``set_status(OK)`` calls (only UNSET → OK transitions + # are honoured), so the close path's UNSET-leave still + # works for clean invocations. + inv_open = self._invocation_span.get(invocation_id) + if inv_open is not None: + inv_open.span.set_status(Status(StatusCode.ERROR, description=event.error.category)) else: span.set_status(Status(StatusCode.OK)) span.end() @@ -839,11 +850,18 @@ def _close_invocation_span(self, invocation_id: str) -> None: open_span = self._invocation_span.pop(invocation_id, None) if open_span is None: return - # Status defaults to OK for completed invocations; if the - # engine surfaced an error, the failing node's span - # already carries it and OTel propagates ERROR up the - # parent chain on its own. - open_span.span.set_status(Status(StatusCode.OK)) + # Don't unconditionally call ``set_status(OK)`` here. OTel + # doesn't auto-propagate child span status to parents, so + # the spec §4.2 / fixture 003 contract ("invocation span + # ends ERROR when a child errored") is satisfied by + # ``_handle_completed`` setting ERROR on this span when an + # error event fires. Calling ``set_status(OK)`` here would + # be a no-op when ERROR was already set (OTel SDK + # status-precedence preserves ERROR), but it's clearer to + # leave the status UNSET in the clean-completion path — + # exporters map UNSET to OK by convention, and the explicit + # ERROR-set in ``_handle_completed`` handles the failure + # path. open_span.span.end() def shutdown(self) -> None: diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 55aa71d..8fd5d3e 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -1,30 +1,46 @@ """Run spec observability conformance fixtures (001-011) against OTelObserver. -Phase 6.0 scope (this PR): - -- **001-basic-trace** — full span shape (private TracerProvider, - correlation_id auto-generation, parent-child hierarchy, span names, - base attribute set). -- **005-llm-provider-span-nested** — §5.5 LLM span emission + - ``disable_llm_spans`` opt-out + §6 TracerProvider isolation under - active external auto-instrumentation. The load-bearing - isolation test. -- **008-detached-trace-mode** — §4.4 detached subgraph + detached - fan-out, with cross-trace ``correlation_id`` consistency. -- **009-correlation-id-cross-cutting** — every span carries - ``openarmature.correlation_id``; back-to-back invocations get - distinct UUIDv4s. - -Phase 6.1 (separate follow-up PR): the remaining 7 fixtures — -002 (subgraph hierarchy), 003 (error status), 004 (routing-error -attribution), 006 (fan-out instance attribution), 007 (retry -attempt spans), 010 (log correlation full assertions), 011 -(determinism). Per-fixture wiring notes live in +Driven fixtures: + +- **001-basic-trace** (Phase 6.0) — full span shape. +- **002-subgraph-hierarchy** (PR-C) — synthetic dispatch span + + inner-node parenting per §4.5. +- **003-error-status** (PR-C) — §4.2 ERROR status mapping for the + ``node_exception`` case. +- **005-llm-provider-span-nested** (Phase 6.0) — §5.5 LLM span + + ``disable_llm_spans`` opt-out + §6 TracerProvider isolation. +- **007-retry-attempt-spans** (PR-C) — sibling attempt spans with + per-attempt ``attempt_index`` under retry middleware. +- **008-detached-trace-mode** (Phase 6.0) — §4.4 detached subgraph + + detached fan-out + cross-trace ``correlation_id``. +- **009-correlation-id-cross-cutting** (Phase 6.0) — every span + carries ``openarmature.correlation_id``; back-to-back + invocations get distinct UUIDv4s. +- **011-determinism** (PR-C) — deterministic span content + (hierarchy, names, status, attributes minus the canonical + non-deterministic-by-design list) is identical across runs. + +Deferred: + +- **004-routing-error-attribution** — needs the proposal-0012 + ordering swap (completed dispatch after edge eval) so the + preceding node's ``completed`` event carries the routing-error + status. Lands in PR-C.1 once v0.9.0 ships. +- **006-fan-out-instance-attribution** — needs non-detached + fan-out per-instance dispatch span synthesis + ``FanOutConfig`` + metadata surfacing. Lands in PR-C.2. +- **010-log-correlation** — needs the synchronous observer prep + hook (``prepare_sync``) so the engine task can attach the + observer's span to OTel context for the duration of node-body + execution. Lands in PR-C.3. + +Per-fixture wiring notes live in ``docs/phase-6-1-conformance-fillin.md``. """ from __future__ import annotations +import copy import re from collections.abc import Mapping from pathlib import Path @@ -52,41 +68,33 @@ _SUPPORTED_FIXTURES = frozenset( { "001-otel-basic-trace", + "002-otel-subgraph-hierarchy", + "003-otel-error-status", "005-otel-llm-provider-span-nested", + "007-otel-retry-attempt-spans", "008-otel-detached-trace-mode", "009-otel-correlation-id-cross-cutting", + "011-otel-determinism", } ) _DEFERRED_FIXTURES: dict[str, str] = { - "002-otel-subgraph-hierarchy": ( - "subgraph dispatch span synthesis (engine wrapper is transparent per " - "fixture 013); deferred to Phase 6.1" - ), - "003-otel-error-status": ( - "status-mapping table across §4 categories; engine path unit-tested. Deferred to Phase 6.1." - ), "004-otel-routing-error-attribution": ( - "routing errors don't fire their own event pair; preceding-node-span " - "attribution wiring; deferred to Phase 6.1" + "Awaiting proposal 0012 — routing-error attribution requires the §3/§6 " + "ordering swap (completed dispatch after edge eval) so RoutingError lands " + "on the preceding node's completed event. Lands in PR-C.1 once v0.9.0 ships." ), "006-otel-fan-out-instance-attribution": ( - "fan-out instance attribution + namespace prefixing; engine path unit-tested. Deferred to Phase 6.1." - ), - "007-otel-retry-attempt-spans": ( - "retry-middleware wiring in the conformance harness; engine emits " - "per-attempt events correctly. Deferred to Phase 6.1." + "Needs non-detached fan-out per-instance dispatch span synthesis + " + "FanOutConfig metadata surfacing per spec §5.4 (current observer opens one " + "shared synthetic span at the namespace prefix, not per-instance). Lands in PR-C.2." ), "010-otel-log-correlation": ( - "OTel Logs Bridge full conformance assertions (trace_id/span_id " - "population on every record); filter unit-tested. Deferred to " - "Phase 6.1." - ), - "011-otel-determinism": ( - "deterministic-portion span content (hierarchy, names, attributes " - "minus timing, status); checked indirectly by other fixtures. " - "Deferred to Phase 6.1." + "Needs synchronous observer prep hook (prepare_sync) so the engine task can " + "attach the observer's span to OTel context for the duration of node-body " + "execution — observer span creation runs on the worker task today and isn't " + "available synchronously after _dispatch_started. Lands in PR-C.3." ), } @@ -127,12 +135,20 @@ async def test_observability_fixture(fixture_path: Path) -> None: spec = _load(fixture_path) if fixture_id == "001-otel-basic-trace": await _run_fixture_001(spec) + elif fixture_id == "002-otel-subgraph-hierarchy": + await _run_fixture_002(spec) + elif fixture_id == "003-otel-error-status": + await _run_fixture_003(spec) elif fixture_id == "005-otel-llm-provider-span-nested": await _run_fixture_005(spec) + elif fixture_id == "007-otel-retry-attempt-spans": + await _run_fixture_007(spec) elif fixture_id == "008-otel-detached-trace-mode": await _run_fixture_008(spec) elif fixture_id == "009-otel-correlation-id-cross-cutting": await _run_fixture_009(spec) + elif fixture_id == "011-otel-determinism": + await _run_fixture_011(spec) else: raise AssertionError(f"no driver for supported fixture {fixture_id!r}") @@ -219,6 +235,450 @@ async def _run_fixture_001(spec: Mapping[str, Any]) -> None: assert final.trace == expected_trace # type: ignore[attr-defined] +# --------------------------------------------------------------------------- +# Fixture 002 — subgraph hierarchy +# --------------------------------------------------------------------------- + + +async def _run_fixture_002(spec: Mapping[str, Any]) -> None: + """Spec §4.5: the subgraph wrapper synthesizes a dispatch span; + inner-node spans parent under it; the dispatch span parents + under the invocation.""" + observer, exporter = _build_observer() + subgraphs = _compile_subgraphs(spec) + trace_log: list[str] = [] + built = build_graph(spec, subgraphs=subgraphs, trace=trace_log) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(spec.get("initial_state", {})) + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + spans = exporter.get_finished_spans() + + by_name: dict[str, list[Any]] = {} + for s in spans: + by_name.setdefault(s.name, []).append(s) + + # Invocation span at the root. + inv_list = by_name.get("openarmature.invocation") or [] + assert len(inv_list) == 1, f"expected 1 invocation span; got {len(inv_list)}" + inv = inv_list[0] + assert inv.parent is None + assert inv.context is not None + invocation_span_id = inv.context.span_id + + # Top-level outer nodes parent under invocation. + for outer_node in ("outer_in", "outer_out"): + outer_spans = by_name.get(outer_node) or [] + assert len(outer_spans) == 1, f"expected 1 span for {outer_node!r}; got {len(outer_spans)}" + node = outer_spans[0] + assert node.parent is not None and node.parent.span_id == invocation_span_id, ( + f"{outer_node!r} MUST parent under invocation span" + ) + + # The subgraph wrapper synthesizes a dispatch span at namespace + # ("outer_sub",); its parent is the invocation span. + sub_dispatch_spans = by_name.get("outer_sub") or [] + assert len(sub_dispatch_spans) == 1, ( + f"expected 1 synthetic subgraph dispatch span for outer_sub; got {len(sub_dispatch_spans)}" + ) + sub_dispatch = sub_dispatch_spans[0] + assert sub_dispatch.parent is not None and sub_dispatch.parent.span_id == invocation_span_id, ( + "subgraph dispatch span MUST parent under the invocation span per §4.5" + ) + assert sub_dispatch.context is not None + sub_dispatch_id = sub_dispatch.context.span_id + sub_dispatch_attrs = dict(sub_dispatch.attributes or {}) + assert sub_dispatch_attrs.get("openarmature.subgraph.name") == "outer_sub" + + # Inner-node spans parent under the subgraph dispatch span and + # carry the nested namespace. + for inner_node in ("inner_x", "inner_y"): + inner_spans = by_name.get(inner_node) or [] + assert len(inner_spans) == 1, f"expected 1 span for {inner_node!r}; got {len(inner_spans)}" + inner = inner_spans[0] + assert inner.parent is not None and inner.parent.span_id == sub_dispatch_id, ( + f"{inner_node!r} MUST parent under the subgraph dispatch span per §4.5" + ) + inner_attrs = dict(inner.attributes or {}) + assert list(inner_attrs.get("openarmature.node.namespace") or []) == ["outer_sub", inner_node], ( + f"{inner_node!r} namespace MUST be ['outer_sub', '{inner_node}']; got " + f"{inner_attrs.get('openarmature.node.namespace')!r}" + ) + + +# --------------------------------------------------------------------------- +# Fixture 003 — error status mapping (node_exception case) +# --------------------------------------------------------------------------- + + +async def _run_fixture_003(spec: Mapping[str, Any]) -> None: + """Spec §4.2: a node-exception failure produces an ERROR span + with the canonical category in the description, an exception + event recorded, and the ``openarmature.error.category`` + attribute. Sibling spans before the failure stay OK; the + invocation span ends ERROR (OTel doesn't auto-propagate child + status to parents, so the OTelObserver explicitly sets ERROR + on the invocation span when any child errors per + ``_handle_completed``).""" + from opentelemetry.trace import StatusCode + + from openarmature.graph import RuntimeGraphError + + observer, exporter = _build_observer() + trace_log: list[str] = [] + built = build_graph(spec, trace=trace_log) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(spec.get("initial_state", {})) + with pytest.raises(RuntimeGraphError): + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + spans = exporter.get_finished_spans() + + by_name = {s.name: s for s in spans} + + ok_node = by_name.get("ok_node") + assert ok_node is not None + assert ok_node.status.status_code == StatusCode.OK, ( + f"ok_node status MUST be OK; got {ok_node.status.status_code}" + ) + + fail_node = by_name.get("fail_node") + assert fail_node is not None + assert fail_node.status.status_code == StatusCode.ERROR, ( + f"fail_node status MUST be ERROR; got {fail_node.status.status_code}" + ) + assert fail_node.status.description == "node_exception", ( + f"fail_node status_description MUST be 'node_exception'; got {fail_node.status.description!r}" + ) + fail_attrs = dict(fail_node.attributes or {}) + assert fail_attrs.get("openarmature.error.category") == "node_exception" + # Exception event recorded on the span via record_exception. + exception_events = [e for e in fail_node.events if e.name == "exception"] + event_names = [e.name for e in fail_node.events] + assert len(exception_events) >= 1, ( + f"fail_node MUST have at least one 'exception' event recorded; got {event_names}" + ) + + # Invocation span ends ERROR when any child errors per spec + # §4.2 / fixture 003. The OTelObserver sets ERROR explicitly in + # ``_handle_completed`` (OTel doesn't auto-propagate child status + # to parents). + inv = by_name.get("openarmature.invocation") + assert inv is not None + assert inv.status.status_code == StatusCode.ERROR, ( + f"invocation span status MUST be ERROR when a child errored; got {inv.status.status_code}" + ) + + +# --------------------------------------------------------------------------- +# Fixture 007 — retry attempt spans +# --------------------------------------------------------------------------- + + +async def _run_fixture_007(spec: Mapping[str, Any]) -> None: + """Two sub-cases: + + 1. ``three_attempts_third_succeeds`` — retry succeeds on + attempt 2; expect 3 sibling attempt spans (ERROR, ERROR, OK). + 2. ``retry_exhausts_all_three_spans_error`` — retry exhausts; + expect 3 sibling attempt spans (all ERROR); invoke raises. + """ + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + case_name = cast("str", case["name"]) + try: + await _run_fixture_007_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_fixture_007_case(case: Mapping[str, Any]) -> None: + from opentelemetry.trace import StatusCode + + from openarmature.graph import RuntimeGraphError + from openarmature.graph.middleware import RetryMiddleware + from openarmature.graph.middleware.retry import deterministic_backoff + + observer, exporter = _build_observer() + # The fixture's flaky directive uses ``fail_count: N`` shape + # (fail attempts 0..N-1, succeed on attempt N) which the + # adapter doesn't translate; rewrite to the adapter's + # ``failure_sequence`` shape before building. + flaky_node_name = cast("str", case["entry"]) + nodes = cast("dict[str, Any]", case["nodes"]) + flaky_node = cast("dict[str, Any]", nodes[flaky_node_name]) + flaky_directive = cast("dict[str, Any]", flaky_node["flaky"]) + fail_count = int(flaky_directive["fail_count"]) + fail_category = cast("str", flaky_directive.get("category", "provider_unavailable")) + on_success = cast("dict[str, Any]", flaky_directive.get("on_success", {})) + flaky_node["flaky"] = { + "failure_sequence": [ + {"category": fail_category, "message": f"flaky attempt {i}"} for i in range(fail_count) + ], + "success_update": on_success, + } + # Translate the per-node retry middleware. The adapter accepts + # ``node_middleware`` mapping; the YAML's + # ``nodes.flaky.middleware: [{type: retry, ...}]`` maps in. + middleware_specs = cast("list[dict[str, Any]]", flaky_node.pop("middleware", []) or []) + node_middleware: dict[str, list[Any]] = {} + for mw_spec in middleware_specs: + if mw_spec["type"] != "retry": + raise AssertionError(f"fixture 007: unexpected middleware type {mw_spec['type']!r}") + backoff_cfg = cast( + "dict[str, Any]", mw_spec.get("backoff") or {"type": "deterministic", "seconds": 0} + ) + if backoff_cfg["type"] != "deterministic": + raise AssertionError(f"fixture 007: unsupported backoff type {backoff_cfg['type']!r}") + backoff = deterministic_backoff(float(backoff_cfg.get("seconds", 0))) + classifier_cfg = cast("dict[str, Any] | None", mw_spec.get("classifier")) + if classifier_cfg is not None: + transient = frozenset(cast("list[str]", classifier_cfg.get("transient_categories", []))) + + def _classifier(exc: Exception, _state: Any, _transient: frozenset[str] = transient) -> bool: + return getattr(exc, "category", None) in _transient + + classifier_fn: Any = _classifier + else: + classifier_fn = None + node_middleware.setdefault(flaky_node_name, []).append( + RetryMiddleware( + max_attempts=int(mw_spec.get("max_attempts", 3)), + backoff=backoff, + classifier=classifier_fn, + ) + ) + + trace_log: list[str] = [] + built = build_graph(case, trace=trace_log, node_middleware=node_middleware) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(case.get("initial_state", {})) + expected_error = case.get("expected_error") + if expected_error is not None: + with pytest.raises(RuntimeGraphError): + await compiled.invoke(initial_state) + else: + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + spans = exporter.get_finished_spans() + + attempt_spans = [s for s in spans if s.name == flaky_node_name] + assert len(attempt_spans) == 3, ( + f"expected 3 sibling attempt spans for {flaky_node_name!r}; got {len(attempt_spans)}" + ) + # Each attempt span has a distinct attempt_index in 0..2 and + # they all share the invocation as parent (siblings). + attempt_indices: list[int] = [] + parent_span_ids: set[int] = set() + for span in attempt_spans: + attrs = dict(span.attributes or {}) + idx = attrs.get("openarmature.node.attempt_index") + assert isinstance(idx, int) + attempt_indices.append(idx) + assert span.parent is not None + parent_span_ids.add(span.parent.span_id) + assert sorted(attempt_indices) == [0, 1, 2], ( + f"attempt_index values MUST be 0..2; got {sorted(attempt_indices)}" + ) + assert len(parent_span_ids) == 1, ( + "all attempt spans MUST share the same parent (sibling-level under the invocation); " + f"got {len(parent_span_ids)} distinct parents" + ) + + # Status assertions. + by_attempt = { + cast("int", dict(s.attributes or {})["openarmature.node.attempt_index"]): s for s in attempt_spans + } + if expected_error is not None: + # All three attempts ERROR. + for idx in (0, 1, 2): + assert by_attempt[idx].status.status_code == StatusCode.ERROR, ( + f"attempt {idx} status MUST be ERROR (retry exhausted); " + f"got {by_attempt[idx].status.status_code}" + ) + else: + # Attempts 0 + 1 ERROR, attempt 2 OK. + for idx in (0, 1): + assert by_attempt[idx].status.status_code == StatusCode.ERROR, ( + f"attempt {idx} status MUST be ERROR (failed before retry succeeded); " + f"got {by_attempt[idx].status.status_code}" + ) + assert by_attempt[2].status.status_code == StatusCode.OK, ( + f"attempt 2 status MUST be OK (success on third attempt); got {by_attempt[2].status.status_code}" + ) + + +# --------------------------------------------------------------------------- +# Fixture 011 — determinism +# --------------------------------------------------------------------------- + + +# Spec-canonical attributes that are non-deterministic by design and +# MUST be excluded from determinism-comparison runs. Per spec +# coordination on the fixture (08-spec-prep-sync-confirmed reaffirms +# the §3.2 + §5.1 distinction): caller-supplied correlation_id is +# deterministic; auto-generated UUIDv4 is not. Fixture 011 omits +# ``caller_correlation_id``, so the auto-generated correlation_id IS +# in the ignore set for this fixture. +_DETERMINISM_IGNORED_ATTRS: frozenset[str] = frozenset( + { + "openarmature.invocation_id", + "openarmature.correlation_id", + } +) + + +async def _run_fixture_011(spec: Mapping[str, Any]) -> None: + """Spec §8: deterministic span content is identical across two + invocations of the same graph with the same input. The + signature compared per-span: + ``(name, status_code, parent_name, attrs ∖ ignored_set)``. + Parent linkage is encoded as the parent span's NAME rather + than its span_id (span_ids are non-deterministic per OTel SDK's + default RandomIdGenerator); a hierarchy regression where a + node reparented to a different ancestor surfaces as a + parent_name divergence.""" + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + case_name = cast("str", case["name"]) + try: + await _run_fixture_011_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_fixture_011_case(case: Mapping[str, Any]) -> None: + # Translate the fixture's ``when:`` conditional-edge syntax + # (``when: {field: counter, gt: 0}``) into the adapter's + # ``condition: {if_field, equals, then, else}`` shape. The + # adapter doesn't have a ``gt`` builder, but the deterministic + # input means ``counter == 1`` always — so ``gt: 0`` is + # functionally equivalent to ``equals: 1`` for this fixture's + # flow. The determinism comparison itself doesn't depend on + # which adapter construct represents the edge; the same + # branch always fires under identical inputs. (Generic + # ``gt``/``lt``/etc. edge-condition support is tracked under + # the Harness backlog in + # ``openarmature-coord/docs/phase-6-1-conformance-fillin.md``.) + case_for_build = _translate_011_when_edges(case) + + invocations = int(case.get("invocations", 2)) + assert invocations == 2, f"fixture 011: expected invocations=2; got {invocations}" + + runs: list[list[Any]] = [] + for _ in range(invocations): + observer, exporter = _build_observer() + trace_log: list[str] = [] + built = build_graph(case_for_build, trace=trace_log) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(case.get("initial_state", {})) + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + runs.append(list(exporter.get_finished_spans())) + + assert len(runs[0]) == len(runs[1]), ( + f"deterministic input MUST produce equal span counts; got {len(runs[0])} vs {len(runs[1])}" + ) + + # Compare each span's structural signature across runs. Span + # span_ids are non-deterministic, so we encode the parent + # linkage by looking up parent.span_id in the same run's + # by-id map and including the parent's NAME in the signature. + # That way a hierarchy regression (e.g., a node reparented + # from invocation to a sibling) shows up as a signature + # difference even though both spans' own attributes are + # unchanged. + def _signature( + span: Any, by_id: Mapping[int, Any] + ) -> tuple[str, str, str | None, tuple[tuple[str, Any], ...]]: + attrs = dict(span.attributes or {}) + deterministic_items = sorted( + (k, _normalize_attr_value(v)) for k, v in attrs.items() if k not in _DETERMINISM_IGNORED_ATTRS + ) + parent_name: str | None = None + if span.parent is not None: + parent_span = by_id.get(span.parent.span_id) + if parent_span is not None: + parent_name = cast("str", parent_span.name) + return ( + cast("str", span.name), + str(span.status.status_code), + parent_name, + tuple(deterministic_items), + ) + + by_id_run_0: dict[int, Any] = {} + for s in runs[0]: + if s.context is not None: + by_id_run_0[s.context.span_id] = s + by_id_run_1: dict[int, Any] = {} + for s in runs[1]: + if s.context is not None: + by_id_run_1[s.context.span_id] = s + sig_run_0 = sorted(_signature(s, by_id_run_0) for s in runs[0]) + sig_run_1 = sorted(_signature(s, by_id_run_1) for s in runs[1]) + assert sig_run_0 == sig_run_1, ( + f"deterministic span content MUST match across runs; " + f"first divergence: run_0={sig_run_0!r} vs run_1={sig_run_1!r}" + ) + + +def _normalize_attr_value(value: Any) -> Any: + """OTel attribute values can be tuple or list shapes for sequence + types depending on how they were set; normalize for comparison.""" + if isinstance(value, list): + return tuple(cast("list[Any]", value)) + if isinstance(value, tuple): + return cast("tuple[Any, ...]", value) + return value + + +def _translate_011_when_edges(case: Mapping[str, Any]) -> dict[str, Any]: + """Rewrite fixture 011's ``when: {field: counter, gt: 0}`` + edges into the adapter's ``condition: {if_field, equals, + then, else}`` shape. The deterministic input always satisfies + the branch, so the comparison can be ``equals: 1``.""" + new_case = cast("dict[str, Any]", copy.deepcopy(case)) + new_edges: list[Any] = [] + branch_when_edge: dict[str, Any] | None = None + branch_default_edge: dict[str, Any] | None = None + for edge in cast("list[dict[str, Any]]", new_case.get("edges", [])): + if "when" in edge: + branch_when_edge = edge + elif edge.get("from") == "branch" and "when" not in edge: + branch_default_edge = edge + else: + new_edges.append(edge) + if branch_when_edge is not None and branch_default_edge is not None: + when = cast("dict[str, Any]", branch_when_edge["when"]) + if_field = cast("str", when["field"]) + # gt: 0 with the deterministic input (counter == 1) → + # equals: 1 is equivalent for this fixture's flow. + new_edges.append( + { + "from": "branch", + "condition": { + "if_field": if_field, + "equals": 1, + "then": branch_when_edge["to"], + "else": branch_default_edge["to"], + }, + } + ) + elif branch_when_edge is not None or branch_default_edge is not None: + raise AssertionError("fixture 011: expected paired when/default edges from 'branch'") + new_case["edges"] = new_edges + return new_case + + # --------------------------------------------------------------------------- # Fixture 009 — correlation_id cross-cutting # ---------------------------------------------------------------------------