From cb2cf1cb4e852789bae4c9db16496eb1aea4eb24 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 08:23:29 -0700 Subject: [PATCH 1/2] Add Langfuse observer (proposal 0031) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New openarmature.observability.langfuse subpackage maps the observer event stream onto Langfuse's native Trace + Observation data model: invocation -> Trace, node/subgraph/fan-out -> Span observation, LLM provider call -> Generation observation. The LangfuseObserver consumes the event stream and emits through a narrow LangfuseClient Protocol with four methods (trace, span, generation, update_trace). Two concrete clients: - InMemoryLangfuseClient bundled for tests and the conformance harness; captures every Trace + Observation as plain dataclass records inspectable by assertions. - A real langfuse.Langfuse() SDK instance is Protocol-compatible and drops in unchanged for production use. SDK versions whose shape diverges plug in via a small adapter the user writes; the shape is documented in examples/10-langfuse-observability. Trace id equals the framework-minted invocation_id verbatim so cross-system lookup by invocation_id finds the Langfuse Trace directly. correlation_id surfaces on both trace.metadata and every observation.metadata for cross-backend join. Trace name sources from the entry-node name; the caller-supplied invocation-label path lands in proposal 0034 (PR 4). Generation rendering follows the §8.7 contract: input/output/ request_extras appear only when disable_llm_payload=False; the §5.5.5 truncation marker is preserved verbatim as a raw string when the serialized payload exceeds payload_byte_cap. Prompt linkage follows §8.4.4 case discrimination: reads Prompt.observability_entities['langfuse_prompt'] (the field added in proposal 0033) to establish a native Prompt-entity link when the prompt's source exposes one. Backends without that reference (filesystem, in-memory) produce metadata-only linkage. Three conformance fixtures pass: 022 basic trace, 023 Generation rendering plus truncation case, 024 prompt linkage both cases. Seven new unit tests cover payload-cap validation, in-memory recorder field handling, and observation parent walking. Example 10-langfuse-observability is a runnable moon-themed demo: a lunar mission Q&A pipeline with a mock Langfuse-source prompt backend, the LangfuseObserver wired to the in-memory recorder, and a pretty-printer for the captured Trace tree at the end. Real SDK swap is a one-line constructor change. Third of 6 PRs in the v0.10.0 batch. --- docs/concepts/observability.md | 97 ++++ docs/examples/10-langfuse-observability.md | 159 ++++++ examples/10-langfuse-observability/main.py | 292 ++++++++++ mkdocs.yml | 1 + src/openarmature/AGENTS.md | 1 + .../observability/langfuse/__init__.py | 56 ++ .../observability/langfuse/client.py | 423 ++++++++++++++ .../observability/langfuse/observer.py | 518 +++++++++++++++++ tests/conformance/test_fixture_parsing.py | 17 +- .../test_observability_langfuse.py | 531 ++++++++++++++++++ tests/test_examples_smoke.py | 1 + tests/unit/test_observability_langfuse.py | 116 ++++ 12 files changed, 2209 insertions(+), 3 deletions(-) create mode 100644 docs/examples/10-langfuse-observability.md create mode 100644 examples/10-langfuse-observability/main.py create mode 100644 src/openarmature/observability/langfuse/__init__.py create mode 100644 src/openarmature/observability/langfuse/client.py create mode 100644 src/openarmature/observability/langfuse/observer.py create mode 100644 tests/conformance/test_observability_langfuse.py create mode 100644 tests/unit/test_observability_langfuse.py diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index e5521cc..1b06549 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -586,3 +586,100 @@ appear dropped. Two workarounds: - Use `SimpleSpanProcessor` instead of `BatchSpanProcessor` in tests; it exports synchronously and is unaffected by teardown timing. + +## Langfuse mapping (opt-in) + +A second sibling observer maps the same `NodeEvent` stream onto +Langfuse's native Trace + Observation data model — Traces at the +top, Span observations for graph nodes, Generation observations for +LLM calls. Use it instead of (or alongside) the OTel observer when +your trace UI is Langfuse and you want first-class Generation +rendering without going through Langfuse's OTLP ingest. + +```python +from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObserver, +) + +client = InMemoryLangfuseClient() # or langfuse.Langfuse(...) in prod +observer = LangfuseObserver(client=client) +graph.attach_observer(observer) +``` + +The `client` is anything matching the `LangfuseClient` Protocol — +the bundled `InMemoryLangfuseClient` (used by the conformance +harness, useful for unit tests), or a real `langfuse.Langfuse()` +instance from the [Langfuse Python SDK](https://github.com/langfuse/langfuse-python). +The Protocol declares only the methods the observer calls, so SDK +versions whose shape matches drop in directly. SDK versions whose +shape diverges (renamed kwargs, return-type quirks) plug in via a +small adapter; see +[`examples/10-langfuse-observability`](../examples/10-langfuse-observability.md) +for the runnable demo plus the adapter shape. + +### What Langfuse sees + +- **Trace ID = invocation ID.** The Trace's `id` is the OA + `invocation_id` verbatim, so cross-system lookup by invocation_id + finds the Langfuse Trace directly (spec §8.4.1). +- **Trace name.** Defaults to the entry-node name (spec §8.6 + fallback). Caller-supplied invocation labels land in PR 4 + (proposal 0034). +- **Per-observation metadata.** Each Span / Generation carries + `namespace`, `step`, `attempt_index`, optional `fan_out_index` / + `branch_name`, and the `correlation_id` cross-cutting join key + (spec §8.5). +- **Generation fields.** LLM calls become Generation observations + with `model`, `model_parameters` (the `gen_ai.request.*` request + parameters lifted by inclusion per §8.4.3), `usage` (input / + output / total tokens), and `metadata.finish_reason` / + `system` / `response_model` / `response_id`. + +### Payload + truncation + +`disable_llm_payload` mirrors the OTel observer's flag — defaults +to `True` for the same privacy reason. Flip to `False` to populate +`generation.input` / `output` / `metadata.request_extras` from the +LLM event payload. + +```python +observer = LangfuseObserver( + client=client, + disable_llm_payload=False, + payload_byte_cap=65536, +) +``` + +When a payload exceeds `payload_byte_cap`, the observer emits the +serialized form with the §5.5.5 truncation marker +(`…[truncated, M bytes total]`) verbatim as a raw string instead of +parsing back to native shape. The unparseable JSON IS the +truncation signal in the Langfuse UI. + +### Prompt linkage + +When a Prompt's source backend exposes a Langfuse Prompt entity +reference under `Prompt.observability_entities['langfuse_prompt']`, +the Generation observation links to that entity natively (spec +§8.4.4 case 1). Backends that don't surface a Langfuse reference +(filesystem, in-memory, etc.) leave the Generation with +`metadata.prompt` populated but no entity link (case 2). + +### Composition with OTel + +The two observers are independent §6 event consumers and can be +attached together. They share the `correlation_id` as the +cross-backend join key — find a slow Generation in Langfuse, search +for its `correlation_id` in OTel logs, see the surrounding +infrastructure activity. + +```python +otel_observer = OTelObserver(span_processor=...) +langfuse_observer = LangfuseObserver(client=langfuse_client) +graph.attach_observer(otel_observer) +graph.attach_observer(langfuse_observer) +``` + +Each observer's `disable_llm_spans` / `disable_llm_payload` flag is +independent; one MAY emit while the other suppresses. diff --git a/docs/examples/10-langfuse-observability.md b/docs/examples/10-langfuse-observability.md new file mode 100644 index 0000000..0d42ca3 --- /dev/null +++ b/docs/examples/10-langfuse-observability.md @@ -0,0 +1,159 @@ +# 10 - Langfuse observability + +Send LLM call observability to Langfuse natively — Trace at the top, +Span observations for graph nodes, Generation observations with input, +output, token usage, model parameters, and a native link back to the +prompt entity the call rendered from. + +## Overview + +A mission-briefing assistant answers questions about Apollo and Artemis +missions. The pipeline fetches a versioned prompt template, renders it +with the user's question, sends the rendered messages to the model, +and stores the response. The Langfuse observer captures the full call +shape as the graph runs. + +The demo's prompt backend stubs a Langfuse-source by attaching a +sentinel `langfuse_prompt` reference to the rendered prompt. The +Generation observation reads that reference and links back to the +prompt entity — exactly what you'd see in a production Langfuse +dashboard threading "this generation came from prompt v7" without any +manual wiring at the call site. + +## What it teaches + +- [`LangfuseObserver`](../concepts/observability.md#langfuse-mapping-opt-in) + attaches like any other observer; nothing in the node code knows or + cares about which backend is recording. +- The `LangfuseClient` Protocol decouples the observer from the SDK. + The bundled `InMemoryLangfuseClient` recorder is the test/demo + shape; production passes a real `langfuse.Langfuse()` instance (or + a thin adapter — see [Reading the output](#reading-the-output) + below). +- Prompt linkage through + [`Prompt.observability_entities`](../concepts/prompts.md#backend-keyed-observability-entity-references): + a prompt backend that exposes a Langfuse Prompt entity reference + surfaces it on every Generation that renders from that prompt. + Filesystem / in-memory backends without that reference work too, + they just produce metadata-only linkage. +- `disable_llm_payload=False` opt-in for capturing input messages + + output content on Generation observations. Default-off is the + privacy posture; the demo deliberately flips it. +- `correlation_id` cross-cutting metadata on the Trace and every + Observation — the join key if you're also running an OTel observer + alongside. + +## How to run + +```bash +uv sync --group examples +LLM_API_KEY=sk-... uv run python examples/10-langfuse-observability/main.py \ + "what year did Apollo 11 land" +``` + +The first positional arg becomes the question. The demo uses an +in-memory recorder so no Langfuse account is needed. + +## The graph + +```mermaid +flowchart TD + start([start]) + answer[answer_briefing] + stop([end]) + + start --> answer --> stop +``` + +A single-node graph: fetch the prompt, render with the question, call +the LLM under `with_active_prompt(...)`, store the response. The +single node is deliberate — the value is in the captured Trace shape, +not the graph topology. + +## Reading the output + +After the answer prints, the script renders the captured Langfuse +Trace + Observation tree: + +``` +question: what year did Apollo 11 land +answer: Apollo 11 landed on the Moon on July 20, 1969 ... +prompt: mission-briefing v7 + +─── captured Langfuse trace ───────────────────────────────── +Trace id=01234567-89ab-... + name='answer_briefing' + metadata={correlation_id='...', entry_node='answer_briefing', spec_version='0.26.0'} + [span] 'answer_briefing' level=DEFAULT + metadata={attempt_index=0, correlation_id='...', namespace=['answer_briefing'], step=0} + [generation] 'openarmature.llm.complete' level=DEFAULT + metadata={correlation_id='...', finish_reason='stop', prompt={...}, + response_id='...', response_model='gpt-4o-mini-2024-...', + system='openai'} + model='gpt-4o-mini' + usage=input:48 output:32 total:80 + prompt_entity_link='lf-prompt-mission-briefing-v7' + output='Apollo 11 landed on the Moon on July 20, 1969 ...' +``` + +- **Trace name = entry node name** by default. The caller-supplied + invocation-label path (a per-`invoke()` argument that overrides the + default) ships with proposal 0034's caller-metadata work. +- **Span observation per node.** `answer_briefing` is the only node + here; a multi-node graph would produce a tree of nested Span + observations under the Trace. +- **Generation observation per LLM call.** Carries `model`, `usage`, + `output`, and the prompt-identity metadata. In a production Langfuse + dashboard this is what the "Generation" detail view renders. +- **`prompt_entity_link`** is the value `Prompt.observability_entities['langfuse_prompt']` + carried — a sentinel string in this demo, a real Langfuse SDK Prompt + object in production. When the backend doesn't surface the reference + (e.g., a filesystem backend), the link is absent but the + `metadata.prompt` map (name, version, label, hashes) still appears + for traceability. + +## Swapping to a real Langfuse SDK + +The observer's `client` parameter is `LangfuseClient`-Protocol-typed, +so any structurally-compatible value works: + +```python +from langfuse import Langfuse + +client = Langfuse( + public_key="pk-lf-...", + secret_key="sk-lf-...", + host="https://cloud.langfuse.com", +) +observer = LangfuseObserver(client=client, disable_llm_payload=False) +``` + +If the installed SDK version's `trace` / `span` / `generation` method +signatures match the Protocol exactly, this is the whole change. If +they diverge (renamed kwargs, return-type quirks), wrap the SDK in a +small adapter class that implements `LangfuseClient` and delegates to +the SDK call-by-call. The Protocol surface is narrow — four methods — +so the adapter is on the order of 40 lines. + +For prompt linkage: in production, the +`Prompt.observability_entities['langfuse_prompt']` value is the SDK's +own Prompt-entity object (returned by `langfuse_client.get_prompt(...)`) +rather than the sentinel string this demo uses. The observer passes +that value straight through to the SDK's `generation(..., prompt=...)` +argument, which is what the SDK uses to establish the native link. + +## Composition with OTel + +Both observers consume the same `NodeEvent` stream and can be attached +together: + +```python +graph.attach_observer(OTelObserver(span_processor=batch)) +graph.attach_observer(LangfuseObserver(client=langfuse_client)) +``` + +Their `disable_llm_spans` / `disable_llm_payload` flags are +independent. The `correlation_id` cross-cutting attribute is the join +key — find a slow Generation in Langfuse, search for the +`correlation_id` in OTel logs to see the surrounding infrastructure +activity. diff --git a/examples/10-langfuse-observability/main.py b/examples/10-langfuse-observability/main.py new file mode 100644 index 0000000..0fa09db --- /dev/null +++ b/examples/10-langfuse-observability/main.py @@ -0,0 +1,292 @@ +"""openarmature demo: Langfuse observer + prompt linkage on a lunar mission Q&A pipeline. + +**Use case:** A mission-briefing assistant answers questions about Apollo +and Artemis missions. The pipeline fetches a versioned prompt template, +renders it with the user's question, sends it to the model, and stores +the response. The team running this in production wants to validate the +prompt is doing what it should — see exactly what messages went out, +what the model returned, what the token usage was, and (critically) +which prompt version produced which response so they can A/B test +prompt revisions safely. + +**Demonstrates:** The Langfuse-native observer that maps every node and +LLM call into a Langfuse Trace + Observation tree. The demo's prompt +backend simulates a Langfuse-aware source by attaching a sentinel +Langfuse Prompt entity reference to each rendered prompt; the Generation +observation picks that up and links back to the entity, which is how +production Langfuse dashboards thread "this generation came from prompt +v7 of `mission-briefing`" without you having to wire anything up +manually. + +The example uses the bundled ``InMemoryLangfuseClient`` recorder so the +demo runs without a Langfuse account — at the end we print the captured +Trace + Observation tree. Swapping to a real ``langfuse.Langfuse()`` +client is a one-line constructor change (see the comment near the +observer build below). + +LLM calls go through ``openarmature.llm.OpenAIProvider``. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/10-langfuse-observability + LLM_API_KEY=sk-... uv run python main.py "what year did Apollo 11 land" + LLM_API_KEY=sk-... uv run python main.py "compare the Artemis II crew to Apollo 8's" +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +from datetime import UTC, datetime +from typing import Any + +from openarmature.graph import END, CompiledGraph, GraphBuilder, State +from openarmature.llm import OpenAIProvider +from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObservation, + LangfuseObserver, + LangfuseTrace, +) +from openarmature.prompts import Prompt, PromptManager, PromptResult +from openarmature.prompts.context import with_active_prompt + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +# ---------------------------------------------------------------------------- +# Mock prompt backend with a Langfuse-source reference +# ---------------------------------------------------------------------------- +# A real production setup would use the Langfuse Python SDK's +# ``LangfusePromptBackend`` (community / forthcoming sibling-package +# territory) which fetches from the Langfuse Prompts API and attaches +# the SDK's Prompt-entity reference to ``Prompt.observability_entities``. +# We stub that here so the demo doesn't need a Langfuse account: the +# sentinel string ``"lf-prompt-mission-briefing-v7"`` stands in for what +# would normally be an SDK Prompt-entity object. + + +class _MockLangfusePromptBackend: + """In-memory PromptBackend that simulates a Langfuse-source by + attaching a sentinel ``langfuse_prompt`` entity reference. + + The Langfuse observer reads + ``Prompt.observability_entities['langfuse_prompt']`` when emitting + the Generation observation. In production that key holds a real + Langfuse SDK Prompt object; here it's a string sentinel so the + captured Trace shows the linkage shape without needing a real SDK. + """ + + def __init__(self) -> None: + now = datetime.now(UTC) + self._prompt = Prompt( + name="mission-briefing", + version="v7", + label="production", + template=( + "You are a lunar mission historian. Answer the following " + "question in two short sentences with specific dates or " + "crew names when relevant.\n\n" + "Question: {{ question }}" + ), + template_hash="sha256:mission-briefing-v7", + fetched_at=now, + observability_entities={ + "langfuse_prompt": "lf-prompt-mission-briefing-v7", + }, + ) + + async def fetch(self, name: str, label: str = "production") -> Prompt: + if name != "mission-briefing": + from openarmature.prompts import PromptNotFound + + raise PromptNotFound( + f"no prompt {name!r} in this demo backend", + name=name, + label=label, + backend="mock-langfuse", + ) + return self._prompt + + +# ---------------------------------------------------------------------------- +# State + node +# ---------------------------------------------------------------------------- + + +class BriefingState(State): + question: str + answer: str = "" + prompt_version: str = "" + + +_PROMPT_MANAGER = PromptManager(_MockLangfusePromptBackend()) + + +async def answer_briefing(s: BriefingState) -> dict[str, Any]: + """Fetch the briefing prompt, render with the question, send to the LLM. + + ``with_active_prompt(rendered)`` is what makes the Generation + observation surface the prompt-identity metadata + Langfuse Prompt + entity link. The Langfuse observer reads the active PromptResult at + dispatch time and threads it onto the Generation observation it + emits. + """ + rendered: PromptResult = await _PROMPT_MANAGER.get( + "mission-briefing", "production", {"question": s.question} + ) + provider = _get_provider() + with with_active_prompt(rendered): + response = await provider.complete(rendered.messages) + return { + "answer": response.message.content or "", + "prompt_version": rendered.version, + } + + +def build_graph() -> CompiledGraph[BriefingState]: + return ( + GraphBuilder(BriefingState) + .add_node("answer_briefing", answer_briefing) + .add_edge("answer_briefing", END) + .set_entry("answer_briefing") + .compile() + ) + + +# ---------------------------------------------------------------------------- +# Pretty-printer for the captured Langfuse Trace +# ---------------------------------------------------------------------------- + + +def _format_trace(trace: LangfuseTrace) -> str: + """Render the captured Trace + Observation tree as a human-readable string. + + Production Langfuse renders this same data in the web UI; the + in-memory recorder gives us the same structured shape so we can + print it to stdout for the demo. + """ + lines: list[str] = [] + lines.append(f"Trace id={trace.id}") + lines.append(f" name={trace.name!r}") + lines.append(f" metadata={_format_metadata(trace.metadata)}") + for obs in trace.children_of(None): + _format_observation(lines, trace, obs, indent=" ") + return "\n".join(lines) + + +def _format_observation( + lines: list[str], trace: LangfuseTrace, obs: LangfuseObservation, indent: str +) -> None: + summary = f"{indent}[{obs.type}] {obs.name!r} level={obs.level}" + lines.append(summary) + if obs.metadata: + lines.append(f"{indent} metadata={_format_metadata(obs.metadata)}") + if obs.type == "generation": + if obs.model is not None: + lines.append(f"{indent} model={obs.model!r}") + if obs.usage is not None: + lines.append( + f"{indent} usage=input:{obs.usage.input} output:{obs.usage.output} total:{obs.usage.total}" + ) + if obs.prompt_entity_link is not None: + lines.append(f"{indent} prompt_entity_link={obs.prompt_entity_link!r}") + if obs.output is not None: + out_str = obs.output if isinstance(obs.output, str) else json.dumps(obs.output) + lines.append(f"{indent} output={out_str[:120]!r}{'...' if len(out_str) > 120 else ''}") + for child in trace.children_of(obs.id): + _format_observation(lines, trace, child, indent=indent + " ") + + +def _format_metadata(metadata: dict[str, Any]) -> str: + # Sort keys for stable demo output across runs. correlation_id is a + # UUIDv4 so its exact value changes every run; truncate it for + # readability without losing the "yes, this is set" signal. + parts: list[str] = [] + for key in sorted(metadata): + value = metadata[key] + if key == "correlation_id" and isinstance(value, str) and len(value) > 12: + value = f"{value[:8]}…" + parts.append(f"{key}={value!r}") + return "{" + ", ".join(parts) + "}" + + +# ---------------------------------------------------------------------------- +# Entry point +# ---------------------------------------------------------------------------- + + +async def main() -> None: + question = " ".join(sys.argv[1:]) or "what year did Apollo 11 land" + + # The bundled in-memory client captures everything the observer + # would have sent to Langfuse — Trace, Observations, Generation + # fields — without needing a Langfuse account. For production: + # + # from langfuse import Langfuse + # client = Langfuse(public_key=..., secret_key=..., host=...) + # + # Replace the InMemoryLangfuseClient construction below with that + # client. The observer code doesn't change — the client is + # Protocol-typed, so any structurally-compatible value works. + client = InMemoryLangfuseClient() + + # disable_llm_payload=False opts in to capturing the input messages + # and output content on Generation observations. Default is True + # for the same privacy reason the OTel observer's flag exists — + # payloads may contain PII the operator hasn't audited. Flip it + # deliberately here because the demo's whole point is showing what + # the model saw and returned. + observer = LangfuseObserver(client=client, disable_llm_payload=False) + + graph = build_graph() + graph.attach_observer(observer) + + try: + final = await graph.invoke(BriefingState(question=question)) + finally: + # Required for short-lived processes: invoke() returns when the + # graph reaches END regardless of whether the observer queue + # has finished draining. Without drain() the last few + # observation calls (the LLM completion's `.end()`, the node's + # close) can be dropped on process exit. + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + print() + print(f"question: {final.question}") + print(f"answer: {final.answer}") + print(f"prompt: mission-briefing {final.prompt_version}") + print() + print("─── captured Langfuse trace ─────────────────────────────────") + # Exactly one Trace per invocation — the LangfuseObserver opens it + # on the first node event and the trace id equals the framework- + # minted invocation_id so cross-system lookups land directly. + assert len(client.traces) == 1, f"expected 1 trace, got {len(client.traces)}" + trace = next(iter(client.traces.values())) + print(_format_trace(trace)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mkdocs.yml b/mkdocs.yml index 53acfc3..b3537e2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -133,6 +133,7 @@ nav: - Multimodal prompt: examples/07-multimodal-prompt.md - Checkpointing and migration: examples/08-checkpointing-and-migration.md - Tool use: examples/09-tool-use.md + - Langfuse observability: examples/10-langfuse-observability.md - Model Providers: - model-providers/index.md - Authoring a Provider: model-providers/authoring.md diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 9daadfb..ed22fee 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -963,6 +963,7 @@ _Runnable example programs shipped in the source tree at `examples/`. The full c - **`examples/07-multimodal-prompt/main.py`** — openarmature demo: two independent analyses of a lunar-mission photograph using versioned prompt templates, a fallback prompt backend, and a multimodal user message. - **`examples/08-checkpointing-and-migration/main.py`** — openarmature demo: a lunar-mission planning pipeline that checkpoints its progress, then resumes under an upgraded state schema. - **`examples/09-tool-use/main.py`** — openarmature demo: a lunar-mission assistant that calls local Python functions as tools to answer fact and physics questions about Apollo / Artemis missions. +- **`examples/10-langfuse-observability/main.py`** — openarmature demo: Langfuse observer + prompt linkage on a lunar mission Q&A pipeline. ## Discovery cross-references diff --git a/src/openarmature/observability/langfuse/__init__.py b/src/openarmature/observability/langfuse/__init__.py new file mode 100644 index 0000000..e857fbb --- /dev/null +++ b/src/openarmature/observability/langfuse/__init__.py @@ -0,0 +1,56 @@ +# Spec mapping: realizes observability §8 (Langfuse backend mapping). +# Sibling to the OTel mapping in ``observability.otel``; both are +# self-contained consumers of the §6 event stream. +# +# Unlike the OTel subpackage, this one does NOT extras-gate: the +# observer is decoupled from any concrete SDK via the +# :class:`LangfuseClient` Protocol, and the bundled +# :class:`InMemoryLangfuseClient` recorder satisfies it without +# requiring the real ``langfuse`` package. Production users pass a +# real ``langfuse.Langfuse()`` instance (Protocol-compatible with the +# methods the observer calls) or write a thin adapter; the +# ``langfuse`` SDK install is on them. + +"""Langfuse backend mapping for openarmature observability. + +Public surface: + +- :class:`LangfuseObserver` — observer-driven Langfuse Trace + + Observation emission per spec observability §8. +- :class:`LangfuseClient` — Protocol the observer calls. Satisfied by + the bundled :class:`InMemoryLangfuseClient` and (structurally) by + the real ``langfuse.Langfuse`` SDK class. +- :class:`InMemoryLangfuseClient` — in-process recorder used by the + conformance harness and useful for unit tests. +- :class:`LangfuseTrace` / :class:`LangfuseObservation` / + :class:`LangfuseUsage` — captured-data records returned by the + recorder. +""" + +from __future__ import annotations + +from .client import ( + InMemoryLangfuseClient, + LangfuseClient, + LangfuseGenerationHandle, + LangfuseObservation, + LangfuseSpanHandle, + LangfuseTrace, + LangfuseUsage, + ObservationLevel, + ObservationType, +) +from .observer import LangfuseObserver + +__all__ = [ + "InMemoryLangfuseClient", + "LangfuseClient", + "LangfuseGenerationHandle", + "LangfuseObservation", + "LangfuseObserver", + "LangfuseSpanHandle", + "LangfuseTrace", + "LangfuseUsage", + "ObservationLevel", + "ObservationType", +] diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py new file mode 100644 index 0000000..8a55280 --- /dev/null +++ b/src/openarmature/observability/langfuse/client.py @@ -0,0 +1,423 @@ +# Spec mapping (observability §8): +# - Captures the minimal Langfuse SDK-shaped surface the LangfuseObserver +# calls — `trace(...)` returning a handle with `.span(...)` / +# `.generation(...)` / `.update(...)` / `.end()`. +# - Protocol-typed so the observer is decoupled from any concrete SDK +# version; users plug in `langfuse.Langfuse()` directly (or a thin +# adapter) for production, and the bundled `InMemoryLangfuseClient` +# for tests / conformance fixtures. +# - All record types are plain dataclasses so the in-memory client's +# captured data is trivially inspectable from test code. + +"""Langfuse client Protocol + in-memory recorder. + +The :class:`LangfuseObserver` consumes the §6 OA event stream and +emits Langfuse Trace + Observation entities through a +:class:`LangfuseClient`. The Protocol is intentionally narrow: it +declares only the methods the observer calls. Concrete sinks: + +- :class:`InMemoryLangfuseClient` — captures everything in dataclass + records. Used by the conformance harness; useful for unit tests. +- A real :class:`langfuse.Langfuse` instance — Protocol-compatible + given the SDK's current shape. Pass it directly to the observer + in production code. + +Future PRs MAY ship a thin adapter for SDK versions whose shape +diverges from the Protocol; for now the in-memory client is the +reference implementation. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal, Protocol, runtime_checkable + +ObservationType = Literal["span", "generation", "event"] + +# Langfuse-supported `level` values per spec §8.4.2 (statusMessage pair). +ObservationLevel = Literal["DEFAULT", "DEBUG", "INFO", "WARNING", "ERROR"] + + +@dataclass +class LangfuseUsage: + """Langfuse Generation `usage` record. Field names match Langfuse SDK.""" + + input: int | None = None + output: int | None = None + total: int | None = None + + +@dataclass +class LangfuseObservation: + """A single Langfuse Observation captured by an in-memory client. + + Carries the observation's type-discriminated shape — Spans hold + timing + metadata; Generations add model/parameters/usage/input/ + output/prompt-entity link; Events are point-in-time markers + (reserved per spec §8.2 — not used by this version of the mapping). + """ + + id: str + type: ObservationType + name: str | None = None + metadata: dict[str, Any] = field(default_factory=dict[str, Any]) + parent_observation_id: str | None = None + level: ObservationLevel = "DEFAULT" + status_message: str | None = None + ended: bool = False + + # Generation-specific (None / empty on Span and Event observations) + model: str | None = None + model_parameters: dict[str, Any] = field(default_factory=dict[str, Any]) + input: Any = None + output: Any = None + usage: LangfuseUsage | None = None + # Opaque reference set when §8.4.4 case 1 triggers — equals + # ``Prompt.observability_entities["langfuse_prompt"]`` from the + # prompt-management capability (proposal 0033). Production + # adapters surface this as a real Langfuse SDK Prompt link; the + # in-memory client just records the value verbatim for inspection. + prompt_entity_link: Any = None + + +@dataclass +class LangfuseTrace: + """A single Langfuse Trace captured by an in-memory client. + + The Trace owns its Observation tree. Observations carry their own + `parent_observation_id`; callers MAY walk to render a tree view. + """ + + id: str + name: str | None = None + metadata: dict[str, Any] = field(default_factory=dict[str, Any]) + observations: list[LangfuseObservation] = field(default_factory=list[LangfuseObservation]) + + def find_observation(self, observation_id: str) -> LangfuseObservation | None: + for obs in self.observations: + if obs.id == observation_id: + return obs + return None + + def children_of(self, parent_id: str | None) -> list[LangfuseObservation]: + return [o for o in self.observations if o.parent_observation_id == parent_id] + + +# Handle Protocols --------------------------------------------------------- +# The Langfuse SDK exposes stateful handles (StatefulSpanClient / +# StatefulGenerationClient) returned from create-calls. The observer +# pins these to update / end observations as the corresponding OA span +# closes. The Protocols below declare only the methods the observer +# touches; SDK clients satisfy them structurally. + + +@runtime_checkable +class LangfuseSpanHandle(Protocol): + """In-flight Span observation handle returned by `client.span(...)`.""" + + @property + def id(self) -> str: ... + + def update(self, **fields: Any) -> None: ... + + def end(self, **fields: Any) -> None: ... + + +@runtime_checkable +class LangfuseGenerationHandle(Protocol): + """In-flight Generation observation handle returned by + `client.generation(...)`.""" + + @property + def id(self) -> str: ... + + def update(self, **fields: Any) -> None: ... + + def end(self, **fields: Any) -> None: ... + + +@runtime_checkable +class LangfuseClient(Protocol): + """Minimal client surface the LangfuseObserver requires. + + Method shape mirrors the Langfuse Python SDK's low-level API: + `client.trace(...)` creates a Trace; `client.span(trace_id=..., ...)` + opens a Span observation; `client.generation(trace_id=..., ...)` + opens a Generation observation. Each returns a stateful handle + the observer keeps and `.end()`s when the corresponding OA span + closes. + + The Protocol does NOT define `event(...)` — Event observations + are reserved by §8.2 but not used in v0.23.0 of the mapping. + """ + + def trace( + self, + *, + id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + """Create a new Trace. + + The Trace `id` MUST be the OA invocation_id verbatim (§8.4.1). + Implementations track Traces internally; observation calls + pass `trace_id` to associate. + """ + ... + + def update_trace( + self, + *, + id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + """Update an existing Trace's mutable fields after creation. + + Used by the observer when the caller-supplied invocation + label (§8.6) lands later than the Trace's open call, or when + additional metadata becomes available mid-invocation. + """ + ... + + def span( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + ) -> LangfuseSpanHandle: ... + + def generation( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + model: str | None = None, + model_parameters: dict[str, Any] | None = None, + input: Any = None, + output: Any = None, + usage: LangfuseUsage | None = None, + prompt: Any = None, + ) -> LangfuseGenerationHandle: ... + + +# Concrete in-memory implementation --------------------------------------- +# Used by tests and the conformance harness. Stores everything the +# observer pushes verbatim so assertions can inspect the captured +# shape directly. + + +@dataclass +class _InMemorySpanHandle: + """Stateful handle pinned by an InMemoryLangfuseClient.""" + + observation: LangfuseObservation + + @property + def id(self) -> str: + return self.observation.id + + def update(self, **fields: Any) -> None: + _apply_fields(self.observation, fields) + + def end(self, **fields: Any) -> None: + _apply_fields(self.observation, fields) + self.observation.ended = True + + +@dataclass +class _InMemoryGenerationHandle: + """Stateful handle pinned by an InMemoryLangfuseClient.""" + + observation: LangfuseObservation + + @property + def id(self) -> str: + return self.observation.id + + def update(self, **fields: Any) -> None: + _apply_fields(self.observation, fields) + + def end(self, **fields: Any) -> None: + _apply_fields(self.observation, fields) + self.observation.ended = True + + +def _apply_fields(observation: LangfuseObservation, fields: dict[str, Any]) -> None: + # Merge SDK-style kwargs into the captured observation. Maps the + # SDK's argument names onto LangfuseObservation's attribute names + # (e.g. `model_parameters` -> `model_parameters`; `usage` stays as + # `usage`). Unknown kwargs are stored on `metadata` so the in-memory + # client doesn't silently drop SDK extensions. + direct = { + "name", + "level", + "status_message", + "model", + "model_parameters", + "input", + "output", + "usage", + "prompt_entity_link", + } + metadata_update: dict[str, Any] = {} + for key, value in fields.items(): + if key in direct: + setattr(observation, key, value) + elif key == "metadata": + if value is not None: + observation.metadata.update(value) + elif key == "prompt": + # The SDK accepts a `prompt=` kwarg on + # `generation(...)`. Mirror that into our explicit + # ``prompt_entity_link`` slot. + observation.prompt_entity_link = value + else: + metadata_update[key] = value + if metadata_update: + observation.metadata.update(metadata_update) + + +@dataclass +class InMemoryLangfuseClient: + """In-memory recorder satisfying :class:`LangfuseClient`. + + Captures every Trace / Span / Generation the observer creates as + plain dataclass records reachable via :attr:`traces`. Tests assert + against the records directly rather than mocking SDK methods. + + The recorder mints observation IDs internally via a simple + counter; production callers (with a real `langfuse.Langfuse()` + client) get SDK-minted UUIDs instead. + """ + + traces: dict[str, LangfuseTrace] = field(default_factory=dict[str, LangfuseTrace]) + _next_observation_id: int = 0 + + def _mint_observation_id(self) -> str: + # Sequential integer suffixes keep test diffs stable across runs. + oid = f"obs-{self._next_observation_id}" + self._next_observation_id += 1 + return oid + + def trace( + self, + *, + id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + self.traces[id] = LangfuseTrace( + id=id, + name=name, + metadata=dict(metadata) if metadata is not None else {}, + ) + + def update_trace( + self, + *, + id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + trace = self.traces.get(id) + if trace is None: + # Treat update-before-create as a create; this shouldn't + # happen under the observer's emission order but stays + # defensive against re-ordered events. + self.trace(id=id, name=name, metadata=metadata) + return + if name is not None: + trace.name = name + if metadata is not None: + trace.metadata.update(metadata) + + def span( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + ) -> LangfuseSpanHandle: + trace = self._get_trace(trace_id) + observation = LangfuseObservation( + id=self._mint_observation_id(), + type="span", + name=name, + metadata=dict(metadata) if metadata is not None else {}, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + ) + trace.observations.append(observation) + return _InMemorySpanHandle(observation=observation) + + def generation( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + model: str | None = None, + model_parameters: dict[str, Any] | None = None, + input: Any = None, + output: Any = None, + usage: LangfuseUsage | None = None, + prompt: Any = None, + ) -> LangfuseGenerationHandle: + trace = self._get_trace(trace_id) + observation = LangfuseObservation( + id=self._mint_observation_id(), + type="generation", + name=name, + metadata=dict(metadata) if metadata is not None else {}, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + model=model, + model_parameters=dict(model_parameters) if model_parameters is not None else {}, + input=input, + output=output, + usage=usage, + prompt_entity_link=prompt, + ) + trace.observations.append(observation) + return _InMemoryGenerationHandle(observation=observation) + + def _get_trace(self, trace_id: str) -> LangfuseTrace: + trace = self.traces.get(trace_id) + if trace is None: + # Auto-create on first observation call. Real SDKs require + # the Trace to exist first; we tolerate observer-side + # ordering quirks by creating-on-demand. + trace = LangfuseTrace(id=trace_id) + self.traces[trace_id] = trace + return trace + + +__all__ = [ + "InMemoryLangfuseClient", + "LangfuseClient", + "LangfuseGenerationHandle", + "LangfuseObservation", + "LangfuseSpanHandle", + "LangfuseTrace", + "LangfuseUsage", + "ObservationLevel", + "ObservationType", +] diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py new file mode 100644 index 0000000..d4dcd0d --- /dev/null +++ b/src/openarmature/observability/langfuse/observer.py @@ -0,0 +1,518 @@ +# Spec mapping (observability §8): +# - Consumes the §6 observer event stream as a sibling to the OTel +# observer (§8.9 composition). +# - Maps invocation → Trace, node/subgraph/fan-out → Span observation, +# LLM provider → Generation observation (§8.3 table). +# - Sets the Trace `id` equal to the OA `invocation_id` so cross-system +# lookup by invocation_id finds the Langfuse Trace verbatim (§8.4.1). +# - Routes correlation_id to both `trace.metadata.correlation_id` and +# every `observation.metadata.correlation_id` per §8.5. +# - Sources Trace name from the entry-node name (§8.6 fallback). The +# caller-supplied invocation-label path lands in proposal 0034 (PR 4 +# of the v0.10.0 batch). +# - Generation rendering follows §8.7: input/output/request_extras +# appear only when `disable_llm_payload=False`; the truncation +# marker is preserved verbatim as a raw string when the §5.5.5 +# truncation makes the JSON unparseable. +# - Prompt linkage follows §8.4.4: reads +# `Prompt.observability_entities["langfuse_prompt"]` to establish a +# native Prompt-entity link when present; metadata-only otherwise. + +"""LangfuseObserver: maps OA events to Langfuse Traces + Observations.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, cast + +from openarmature.observability.llm_event import LLM_NAMESPACE, LlmEventPayload + +from .client import ( + LangfuseClient, + LangfuseGenerationHandle, + LangfuseSpanHandle, + LangfuseUsage, +) + +if TYPE_CHECKING: + from openarmature.graph.events import NodeEvent + + +# §5.5.5 / §8.7 truncation: when the serialized payload exceeds the +# configured cap, the marker below is appended and the unparseable +# JSON serves as the "this was truncated" signal in Langfuse's input +# / output / metadata.request_extras fields. +_TRUNCATION_MARKER_TEMPLATE = "…[truncated, {m} bytes total]" + +# §5.5.5 minimum-cap rule mirrors the OTel observer's bound. 256 bytes +# is the smallest value that fits the worst-case marker (~36 bytes) +# plus a diagnostically useful preview. +_PAYLOAD_MIN_BYTES = 256 + + +def _read_spec_version() -> str: + """Lazy spec-version read; mirrors the OTel observer's lookup so + Langfuse-side spec_version metadata stays in lockstep.""" + from openarmature import __spec_version__ + + return __spec_version__ + + +# In-flight Span observation handle, keyed by the standard span-stack +# key (namespace, attempt_index, fan_out_index). Mirrors the OTel +# observer's _OpenSpan shape but holds a Langfuse handle instead of an +# OTel Span. +_StackKey = tuple[tuple[str, ...], int, int | None] + + +@dataclass +class _OpenObservation: + """An in-flight Langfuse observation pinned in the observer's state.""" + + handle: LangfuseSpanHandle | LangfuseGenerationHandle + + +@dataclass +class _InvState: + """Per-invocation state, isolated by invocation_id. + + A single LangfuseObserver is safe to share across concurrent + invocations; each invocation's in-flight observations live under + its own _InvState so they never collide. + """ + + trace_id: str + open_observations: dict[_StackKey, _OpenObservation] = field( + default_factory=dict[_StackKey, _OpenObservation] + ) + open_llm_observations: dict[str, _OpenObservation] = field(default_factory=dict[str, _OpenObservation]) + + +@dataclass +class LangfuseObserver: + """Observer-driven Langfuse mapping per spec observability §8. + + Construct with a :class:`LangfuseClient` — the bundled + :class:`InMemoryLangfuseClient` for tests, or a real + ``langfuse.Langfuse()`` instance for production. The observer + handles the §6 event stream and emits Trace + Observation entities + through the client. + + Constructor knobs: + + - ``client``: the Langfuse sink (Protocol-typed). + - ``disable_llm_spans``: when ``True`` the observer skips + Generation observations on LLM provider events. + - ``disable_llm_payload``: default ``True`` per §8.9's "symmetric + privacy posture" with the OTel observer. Gates + ``generation.input`` / ``output`` / ``metadata.request_extras`` + emission. + - ``payload_byte_cap``: per-attribute byte cap on the source + payload string before parse-back. Mirrors the OTel observer's + ``payload_max_bytes`` semantic — emission preserves the raw + truncated string when the §5.5.5 marker is present (per §8.7). + Default 64 KiB; same minimum (256 bytes) applies. + + The observer reads the spec version from the package at + construction time. Safe to share across concurrent invocations + and across resumes of the same correlation_id; per-invocation + state isolation keys all internal maps by invocation_id. + """ + + client: LangfuseClient + disable_llm_spans: bool = False + disable_llm_payload: bool = True + payload_byte_cap: int = 65536 + spec_version: str = field(default_factory=_read_spec_version) + + # Internal state populated during invocation. + _inv_states: dict[str, _InvState] = field(init=False, repr=False, default_factory=dict[str, _InvState]) + + def __post_init__(self) -> None: + # §5.5.5 minimum-cap validation mirrors the OTel observer's bound. + # Reject misconfigurations at construction time rather than + # surfacing them as a Langfuse-ingest error later. + if self.payload_byte_cap < _PAYLOAD_MIN_BYTES: + raise ValueError( + f"payload_byte_cap={self.payload_byte_cap} below the spec §5.5.5 " + f"minimum of {_PAYLOAD_MIN_BYTES} bytes" + ) + + async def __call__(self, event: NodeEvent) -> None: + # LLM provider events use a sentinel namespace per §5.5; route + # them to the dedicated Generation path. + if event.namespace == LLM_NAMESPACE: + if not self.disable_llm_spans: + self._handle_llm_event(event) + return + if event.phase == "started": + self._open_started_observation(event) + elif event.phase == "completed": + self._handle_completed(event) + # checkpoint_saved and checkpoint_migrated are OTel-mapping- + # specific synthetic phases per §5.5 / §10.8; the Langfuse + # mapping doesn't surface checkpoint events as observations + # in v0.23.0 (§8.10's deferral envelope). + + # ------------------------------------------------------------------ + # Span observation lifecycle (node / subgraph / fan-out) + # ------------------------------------------------------------------ + + def _open_started_observation(self, event: NodeEvent) -> None: + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + invocation_id = current_invocation_id() + if invocation_id is None: + return + correlation_id = current_correlation_id() + + # Lazy Trace open on the first event for this invocation_id. + # The Trace ID equals the invocation_id verbatim per §8.4.1 so + # cross-system lookup is a direct hit. + if invocation_id not in self._inv_states: + self._open_trace(invocation_id, correlation_id, event) + + inv_state = self._inv_states[invocation_id] + key = self._key_for(event) + if key in inv_state.open_observations: + # Idempotent: a second started for the same (namespace, + # attempt_index, fan_out_index) tuple is a no-op (matches + # the OTel observer's behavior under retry-replay). + return + + parent_observation_id = self._resolve_parent_observation_id(inv_state, event) + metadata = self._observation_metadata(event, correlation_id) + handle = self.client.span( + trace_id=inv_state.trace_id, + name=event.node_name, + metadata=metadata, + parent_observation_id=parent_observation_id, + ) + inv_state.open_observations[key] = _OpenObservation(handle=handle) + + def _handle_completed(self, event: NodeEvent) -> None: + from openarmature.observability.correlation import current_invocation_id + + invocation_id = current_invocation_id() + if invocation_id is None: + return + inv_state = self._inv_states.get(invocation_id) + if inv_state is None: + return + key = self._key_for(event) + observation = inv_state.open_observations.pop(key, None) + if observation is None: + return + # Error-category mapping per §8.4.2: error.category → level=ERROR + # + statusMessage=. + if event.error is not None and getattr(event.error, "category", None) is not None: + observation.handle.end(level="ERROR", status_message=event.error.category) + else: + observation.handle.end() + + def _open_trace(self, invocation_id: str, correlation_id: str | None, event: NodeEvent) -> None: + metadata: dict[str, Any] = { + "entry_node": event.node_name, + "spec_version": self.spec_version, + } + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + # §8.6 trace name: caller-supplied invocation label takes + # precedence; entry-node name is the spec-recommended fallback. + # The caller-supplied path lands in proposal 0034 (PR 4) — for + # now only the fallback is wired. + trace_name = event.node_name + self.client.trace(id=invocation_id, name=trace_name, metadata=metadata) + self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) + + def _key_for(self, event: NodeEvent) -> _StackKey: + return (event.namespace, event.attempt_index, event.fan_out_index) + + def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) -> str | None: + # Walk namespace ancestors looking for the innermost open + # observation; fall back to None (Trace becomes the parent). + # Subgraph dispatch / fan-out per-instance / detached-trace + # parenting are deferred from this version of the observer + # (no fixtures exercise them); future PRs add per-spec-§8.3 + # synthetic dispatch observations. + for prefix_len in range(len(event.namespace) - 1, 0, -1): + prefix = event.namespace[:prefix_len] + for key, observation in inv_state.open_observations.items(): + if key[0] == prefix: + return observation.handle.id + return None + + def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> dict[str, Any]: + # §8.4.2 observation-level mapping. Fields below mirror the + # OTel observer's _node_attrs() output, renamed for Langfuse's + # flat metadata shape (no `openarmature.` namespace prefix — + # Langfuse's metadata bag is per-observation). + metadata: dict[str, Any] = { + "namespace": list(event.namespace), + "step": event.step, + "attempt_index": event.attempt_index, + } + if event.fan_out_index is not None: + metadata["fan_out_index"] = event.fan_out_index + if event.branch_name is not None: + metadata["branch_name"] = event.branch_name + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + if event.fan_out_config is not None: + cfg = event.fan_out_config + metadata["fan_out_item_count"] = cfg.item_count + metadata["fan_out_concurrency"] = 0 if cfg.concurrency is None else cfg.concurrency + metadata["fan_out_error_policy"] = cfg.error_policy + return metadata + + # ------------------------------------------------------------------ + # Generation observation lifecycle (LLM provider events) + # ------------------------------------------------------------------ + + def _handle_llm_event(self, event: NodeEvent) -> None: + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + if not isinstance(event.pre_state, LlmEventPayload): + # Defensive — sentinel-namespaced events MUST carry an + # LlmEventPayload per llm-provider / observability §5.5. + return + invocation_id = current_invocation_id() + if invocation_id is None: + return + payload = event.pre_state + # The Trace MAY not exist yet if the LLM call fires before any + # node `started` event has hit this observer (race-y under + # tests that prepare via `prepare_sync` only). The in-memory + # client tolerates create-on-demand; production SDK adapters + # should too. + if invocation_id not in self._inv_states: + self._open_trace(invocation_id, current_correlation_id(), event) + inv_state = self._inv_states[invocation_id] + correlation_id = current_correlation_id() + + if event.phase == "started": + parent_observation_id = self._resolve_llm_parent_observation_id(inv_state, payload) + metadata, model_parameters, input_value, output_value = self._llm_metadata_and_payload( + payload, correlation_id, phase="started" + ) + handle = self.client.generation( + trace_id=inv_state.trace_id, + name="openarmature.llm.complete", + model=payload.model, + model_parameters=model_parameters, + input=input_value, + output=output_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + prompt=self._resolve_prompt_link(payload), + ) + inv_state.open_llm_observations[payload.call_id] = _OpenObservation(handle=handle) + return + + # completed: pop the started handle and finalize. + observation = inv_state.open_llm_observations.pop(payload.call_id, None) + if observation is None: + return + metadata, _model_parameters, _input_value, output_value = self._llm_metadata_and_payload( + payload, correlation_id, phase="completed" + ) + end_kwargs: dict[str, Any] = {"metadata": metadata} + if output_value is not None: + end_kwargs["output"] = output_value + usage = self._usage_from_payload(payload) + if usage is not None: + end_kwargs["usage"] = usage + # Error-category mapping: §8.4.2 + §8.4.3 (an LLM provider + # error_category lands on the Generation observation's level + # and statusMessage the same as on a Span observation). + if payload.error_category is not None: + end_kwargs["level"] = "ERROR" + end_kwargs["status_message"] = payload.error_category + observation.handle.end(**end_kwargs) + + def _resolve_llm_parent_observation_id( + self, inv_state: _InvState, payload: LlmEventPayload + ) -> str | None: + # Calling-node identity comes from the payload (set at + # dispatch time per llm-provider §5.5). Resolve the calling + # node's open observation; fall back to None (Trace parent) + # if not found. + key: _StackKey = ( + payload.calling_namespace_prefix, + payload.calling_attempt_index, + payload.calling_fan_out_index, + ) + observation = inv_state.open_observations.get(key) + if observation is not None: + return observation.handle.id + return None + + def _llm_metadata_and_payload( + self, + payload: LlmEventPayload, + correlation_id: str | None, + *, + phase: str, + ) -> tuple[dict[str, Any], dict[str, Any], Any, Any]: + # Returns (metadata, model_parameters, input, output) for the + # generation(...) / .end(...) call. Phase-specific filtering + # keeps the started call lean (input only) and the completed + # call focused on the output + usage + response metadata. + metadata: dict[str, Any] = {} + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + # gen_ai.system → metadata.system per §8.4.3 + metadata["system"] = payload.genai_system + # Prompt-identity metadata (§8.4.4 always-on, independent of + # whether a Langfuse Prompt entity link is established). + active_prompt = payload.active_prompt + if active_prompt is not None: + metadata["prompt"] = { + "name": active_prompt.name, + "version": active_prompt.version, + "label": active_prompt.label, + "template_hash": active_prompt.template_hash, + "rendered_hash": active_prompt.rendered_hash, + } + active_group = payload.active_prompt_group + if active_group is not None: + metadata["prompt_group_name"] = active_group.group_name + + model_parameters: dict[str, Any] = {} + request_params = payload.request_params or {} + # Per §8.4.3: every gen_ai.request. attribute lifts to + # generation.modelParameters. by inclusion. The §5.5.2 + # source set keys this on (temperature, max_tokens, top_p, + # seed, frequency_penalty, presence_penalty, stop_sequences as + # of v0.24.0); new request-param attrs added in future spec + # versions flow through automatically. + for key, value in request_params.items(): + model_parameters[key] = value + + # Input/output payload gated by disable_llm_payload (§8.7). + input_value: Any = None + output_value: Any = None + if not self.disable_llm_payload: + if phase == "started" and payload.input_messages is not None: + # The payload's input_messages is already image- + # redacted at the provider per §5.5.5 (inline image + # bytes never reach the observer). Serialize and + # compare against the configured cap; under cap the + # native shape is fine, over cap §8.7 says preserve + # the raw truncated string with the marker. + input_value = self._maybe_truncate_for_input(payload.input_messages) + if phase == "completed" and payload.output_content is not None: + output_value = self._maybe_truncate_for_output(payload.output_content) + if phase == "started" and payload.request_extras: + # request_extras renders into metadata, not the input + # field, per §8.4.3 (`metadata.request_extras`). + metadata["request_extras"] = self._maybe_truncate_for_extras(dict(payload.request_extras)) + + # Response metadata fields land on the completed call (§8.4.3). + if phase == "completed": + if payload.finish_reason is not None: + metadata["finish_reason"] = payload.finish_reason + if payload.response_model is not None: + metadata["response_model"] = payload.response_model + if payload.response_id is not None: + metadata["response_id"] = payload.response_id + + return metadata, model_parameters, input_value, output_value + + def _usage_from_payload(self, payload: LlmEventPayload) -> LangfuseUsage | None: + # Map OA usage fields onto the Langfuse Usage record per + # §8.4.3. Returns None when no usage was reported (all three + # token fields None) so the Generation observation reflects + # absence rather than zeroed counts. + if ( + payload.prompt_tokens is None + and payload.completion_tokens is None + and payload.total_tokens is None + ): + return None + return LangfuseUsage( + input=payload.prompt_tokens, + output=payload.completion_tokens, + total=payload.total_tokens, + ) + + def _resolve_prompt_link(self, payload: LlmEventPayload) -> Any: + # §8.4.4 case discrimination: the trigger is whether the + # prompt's source exposes a Langfuse Prompt reference, not + # which specific backend produced it. PromptResult has + # observability_entities['langfuse_prompt'] populated when + # case 1 applies; absent otherwise. + active_prompt = payload.active_prompt + if active_prompt is None: + return None + # PromptResult is typed Any on LlmEventPayload to avoid a + # cross-package import (see llm_event.py for the rationale); + # read defensively. + entities = getattr(active_prompt, "observability_entities", None) + if not isinstance(entities, dict): + return None + return cast("dict[str, Any]", entities).get("langfuse_prompt") + + def _maybe_truncate_for_input(self, value: Any) -> Any: + # Returns the native value when it fits the cap, or the + # truncated string-with-marker when it doesn't. §8.7's + # "raw truncated string" rule: the unparseable JSON IS the + # truncation signal, surfacing the marker rather than faking + # a parse. + serialized = self._serialize_payload_value(value) + truncated = _truncate(serialized, self.payload_byte_cap) + if truncated is None: + return value # fits cap, native shape preserved + return truncated + + def _maybe_truncate_for_output(self, value: str) -> str: + # generation.output is a plain string in Langfuse's shape; + # apply the cap directly to the source string. + truncated = _truncate(value, self.payload_byte_cap) + return truncated if truncated is not None else value + + def _maybe_truncate_for_extras(self, value: dict[str, Any]) -> Any: + # request_extras goes on metadata as a native dict when it + # fits; falls through to the raw truncated string when it + # doesn't, matching §8.7's parse-fallthrough story. + serialized = self._serialize_payload_value(value) + truncated = _truncate(serialized, self.payload_byte_cap) + if truncated is None: + return value + return truncated + + @staticmethod + def _serialize_payload_value(value: Any) -> str: + # Mirrors observability/otel/observer.py's _serialize_for_attribute + # so both observers see the same string under the same cap. + return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + + +def _truncate(serialized: str, cap_bytes: int) -> str | None: + # Returns None when the serialized form fits within cap_bytes, + # or the truncated-with-marker string otherwise. Mirrors the OTel + # observer's _truncate_for_attribute algorithm (UTF-8 code-point + # boundary backtracking, marker append). + encoded = serialized.encode("utf-8") + full_length = len(encoded) + if full_length <= cap_bytes: + return None + marker = _TRUNCATION_MARKER_TEMPLATE.format(m=full_length) + marker_bytes = marker.encode("utf-8") + target = cap_bytes - len(marker_bytes) + if target <= 0: + return marker + boundary = target + while boundary > 0 and (encoded[boundary] & 0b1100_0000) == 0b1000_0000: + boundary -= 1 + return encoded[:boundary].decode("utf-8", errors="strict") + marker + + +__all__ = ["LangfuseObserver"] diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index e87bac3..24ef1cd 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -49,9 +49,20 @@ def _id(case: tuple[str, Path]) -> str: # typed directives. The Langfuse harness lands in the next PR of # the batch (PR 3) and adds the matching directive model; # deferring here keeps the parser tests passing in the meantime. - "observability/022-langfuse-basic-trace": "Langfuse harness lands in PR 3 (proposal 0031)", - "observability/023-langfuse-generation-rendering": "Langfuse harness lands in PR 3 (proposal 0031)", - "observability/024-langfuse-prompt-linkage": "Langfuse harness lands in PR 3 (proposal 0031)", + # Langfuse fixtures (022-024) use a directive shape the cross- + # capability parser doesn't model (`langfuse_observer:`, + # `expected.langfuse_trace`, `prompt_backend:`). The capability- + # specific harness at tests/conformance/test_observability_langfuse.py + # parses these directly via yaml + tailored helpers. + "observability/022-langfuse-basic-trace": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), + "observability/023-langfuse-generation-rendering": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), + "observability/024-langfuse-prompt-linkage": ( + "Langfuse shape models live in the dedicated test_observability_langfuse harness" + ), # proposal 0034 caller-supplied invocation metadata fixtures (PR 4). "observability/027-langfuse-caller-supplied-metadata": "Caller-metadata harness lands in PR 4 (0034)", "observability/028-caller-metadata-namespace-rejection": "Caller-metadata harness lands in PR 4 (0034)", diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py new file mode 100644 index 0000000..b158d20 --- /dev/null +++ b/tests/conformance/test_observability_langfuse.py @@ -0,0 +1,531 @@ +# Spec mapping (observability §8): drives the three Langfuse mapping +# fixtures (022 basic trace, 023 Generation rendering + truncation, 024 +# prompt linkage) against the in-memory LangfuseObserver client. Sibling +# of test_observability.py (OTel mapping); shares no harness state with +# the OTel side — each fixture builds its own graph + observer instance. + +"""Run spec observability Langfuse conformance fixtures (022-024).""" + +from __future__ import annotations + +import json +from collections.abc import Mapping, Sequence +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, cast + +import httpx +import pytest +import yaml + +from openarmature.graph import END, GraphBuilder +from openarmature.llm import OpenAIProvider +from openarmature.llm.response import RuntimeConfig +from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObservation, + LangfuseObserver, + LangfuseTrace, +) +from openarmature.prompts import ( + Prompt, + PromptManager, + SamplingConfig, +) +from openarmature.prompts.context import with_active_prompt + +from .adapter import build_state_cls + +CONFORMANCE_DIR = ( + Path(__file__).resolve().parents[2] / "openarmature-spec" / "spec" / "observability" / "conformance" +) + + +_LANGFUSE_FIXTURES = frozenset( + { + "022-langfuse-basic-trace", + "023-langfuse-generation-rendering", + "024-langfuse-prompt-linkage", + } +) + + +def _fixture_paths() -> list[Path]: + return sorted(p for p in CONFORMANCE_DIR.glob("[0-9][0-9][0-9]-*.yaml") if p.stem in _LANGFUSE_FIXTURES) + + +def _fixture_id(path: Path) -> str: + return path.stem + + +def _load(path: Path) -> dict[str, Any]: + return cast("dict[str, Any]", yaml.safe_load(path.read_text())) + + +# --------------------------------------------------------------------------- +# Mock LLM transport +# --------------------------------------------------------------------------- + + +def _build_mock_llm_handler(responses: list[dict[str, Any]]) -> httpx.MockTransport: + queue = list(responses) + + def _handler(_request: httpx.Request) -> httpx.Response: + if not queue: + raise AssertionError("mock_llm queue exhausted") + spec_resp = queue.pop(0) + body = cast("dict[str, Any]", spec_resp.get("body") or {}) + return httpx.Response( + int(spec_resp.get("status", 200)), + content=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + return httpx.MockTransport(_handler) + + +# --------------------------------------------------------------------------- +# Mock prompt backend (fixture 024) +# --------------------------------------------------------------------------- + + +class _MockPromptBackend: + """Returns canned Prompts for fixture 024. + + Two flavors per the fixture YAML's ``prompt_backend.type``: + + - ``mock_with_langfuse_reference``: attaches the supplied + ``langfuse_prompt_reference`` sentinel under + ``Prompt.observability_entities['langfuse_prompt']``. Verifies + §8.4.4 case 1 (Generation linked to Prompt entity). + - ``filesystem``: no Langfuse reference attached. Verifies §8.4.4 + case 2 (metadata-only). + """ + + def __init__(self, prompts: dict[str, dict[str, Any]], *, with_langfuse_reference: bool) -> None: + self._prompts: dict[str, Prompt] = {} + now = datetime.now(UTC) + for prompt_name, spec in prompts.items(): + observability_entities: dict[str, Any] | None = None + if with_langfuse_reference and "langfuse_prompt_reference" in spec: + observability_entities = {"langfuse_prompt": spec["langfuse_prompt_reference"]} + self._prompts[prompt_name] = Prompt( + name=spec["name"], + version=spec["version"], + label=spec["label"], + template=spec["template"], + template_hash=spec["template_hash"], + fetched_at=now, + observability_entities=observability_entities, + ) + + async def fetch(self, name: str, label: str = "production") -> Prompt: + return self._prompts[name] + + +# --------------------------------------------------------------------------- +# Fixture runner +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture_path", _fixture_paths(), ids=_fixture_id) +async def test_langfuse_fixture(fixture_path: Path) -> None: + spec = _load(fixture_path) + if "cases" in spec: + for case in cast("list[dict[str, Any]]", spec["cases"]): + try: + await _run_case(case) + except AssertionError as e: + raise AssertionError(f"case {case.get('name')!r}: {e}") from e + else: + await _run_case(spec) + + +async def _run_case(case: Mapping[str, Any]) -> None: + # ---- Mock LLM transport (if the graph has an LLM call) + mock_responses = cast("list[dict[str, Any]] | None", case.get("mock_llm")) + transport = _build_mock_llm_handler(mock_responses) if mock_responses else None + provider: OpenAIProvider | None = None + if transport is not None: + provider = OpenAIProvider( + base_url="http://mock-llm.test", + model=_resolve_llm_model(case), + api_key="test", + transport=transport, + ) + + # ---- Prompt backend (fixture 024) + prompt_manager: PromptManager | None = None + prompt_backend_spec = cast("dict[str, Any] | None", case.get("prompt_backend")) + if prompt_backend_spec is not None: + backend_type = prompt_backend_spec.get("type") + prompts_block = cast("dict[str, dict[str, Any]]", prompt_backend_spec.get("prompts") or {}) + backend = _MockPromptBackend( + prompts_block, + with_langfuse_reference=(backend_type == "mock_with_langfuse_reference"), + ) + prompt_manager = PromptManager(backend) + + # ---- Graph build + state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + state_cls = build_state_cls("LangfuseFixtureState", state_fields) + nodes_spec = cast("dict[str, Any]", case["nodes"]) + entry = cast("str", case["entry"]) + edges = cast("list[dict[str, str]]", case["edges"]) + render_variables = cast("dict[str, Any]", case.get("render_variables") or {}) + + builder = GraphBuilder(state_cls) + for node_name, node_spec in nodes_spec.items(): + node_body = _build_node_body( + node_name=node_name, + node_spec=cast("dict[str, Any]", node_spec), + provider=provider, + prompt_manager=prompt_manager, + render_variables=render_variables, + ) + builder.add_node(node_name, node_body) + for edge in edges: + target_raw = edge["to"] + target = END if target_raw == "END" else target_raw + builder.add_edge(edge["from"], target) + builder.set_entry(entry) + graph = builder.compile() + + # ---- Observer + observer_cfg = cast("dict[str, Any]", case.get("langfuse_observer") or {}) + observer_kwargs: dict[str, Any] = {} + if "disable_llm_payload" in observer_cfg: + observer_kwargs["disable_llm_payload"] = bool(observer_cfg["disable_llm_payload"]) + if "disable_llm_spans" in observer_cfg: + observer_kwargs["disable_llm_spans"] = bool(observer_cfg["disable_llm_spans"]) + if "payload_byte_cap" in observer_cfg: + observer_kwargs["payload_byte_cap"] = int(observer_cfg["payload_byte_cap"]) + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, **observer_kwargs) + graph.attach_observer(observer) + + # ---- Run + initial_state_cls = graph.state_cls + correlation_id = cast("str | None", case.get("caller_correlation_id")) + invoke_kwargs: dict[str, Any] = {} + if correlation_id is not None: + invoke_kwargs["correlation_id"] = correlation_id + await graph.invoke(initial_state_cls(), **invoke_kwargs) + await graph.drain() + if provider is not None: + await provider.aclose() + + # ---- Assert + expected = cast("dict[str, Any]", case["expected"]) + expected_trace = cast("dict[str, Any]", expected["langfuse_trace"]) + assert len(client.traces) == 1, f"expected exactly one Trace, got {len(client.traces)}" + trace = next(iter(client.traces.values())) + _assert_trace( + trace, expected_trace, expected_invariants=cast("dict[str, Any]", expected.get("invariants") or {}) + ) + + +def _resolve_llm_model(case: Mapping[str, Any]) -> str: + # Single LLM call per fixture today; pick up the per-call model if + # supplied (fixture 023 explicitly sets `model: "test-model"` on + # the calls_llm block). + nodes_spec = cast("dict[str, Any]", case["nodes"]) + for node_spec in nodes_spec.values(): + if not isinstance(node_spec, dict): + continue + node_dict = cast("dict[str, Any]", node_spec) + calls_llm = node_dict.get("calls_llm") + if isinstance(calls_llm, dict): + return cast("str", cast("dict[str, Any]", calls_llm).get("model", "test-model")) + return "test-model" + + +def _build_node_body( + *, + node_name: str, + node_spec: dict[str, Any], + provider: OpenAIProvider | None, + prompt_manager: PromptManager | None, + render_variables: dict[str, Any], +) -> Any: + # Three node shapes in fixtures 022-024: + # - `update: {...}` — set fields on state directly (022). + # - `calls_llm: {...}` — invoke provider.complete (023). + # - `renders_prompt: ` + optional `calls_llm` — render the + # named prompt, then call the LLM under `with_active_prompt` + # so the Generation's prompt-linkage metadata + entity link + # populate per §8.4.4 (024). + update_spec = cast("dict[str, Any] | None", node_spec.get("update")) + if update_spec is not None: + + async def _node(_s: Any) -> dict[str, Any]: + return dict(update_spec) + + return _node + + calls_llm_spec = cast("dict[str, Any] | None", node_spec.get("calls_llm")) + renders_prompt_name = cast("str | None", node_spec.get("renders_prompt")) + + async def _llm_node(_s: Any) -> dict[str, Any]: + assert provider is not None, f"node {node_name!r} has calls_llm but no mock_llm responses" + messages_spec = cast( + "list[dict[str, Any]] | None", + (calls_llm_spec or {}).get("messages"), + ) + config_spec = cast("dict[str, Any] | None", (calls_llm_spec or {}).get("config")) + stores_in = cast("str", (calls_llm_spec or {}).get("stores_response_in", "msg")) + if renders_prompt_name is not None: + assert prompt_manager is not None, "renders_prompt requires a prompt_backend block" + prompt = await prompt_manager.fetch(renders_prompt_name) + rendered = prompt_manager.render(prompt, render_variables) + llm_messages = list(rendered.messages) + with with_active_prompt(rendered): + response = await provider.complete( + cast("Sequence[Any]", llm_messages), + config=_runtime_config_from_spec(config_spec), + ) + else: + llm_messages = _materialize_messages(messages_spec or []) + response = await provider.complete( + cast("Sequence[Any]", llm_messages), + config=_runtime_config_from_spec(config_spec), + ) + return {stores_in: response.message.content or ""} + + return _llm_node + + +def _materialize_messages(raw: list[dict[str, Any]]) -> list[Any]: + from openarmature.llm.messages import AssistantMessage, SystemMessage, ToolMessage, UserMessage + + out: list[Any] = [] + for entry in raw: + role = entry.get("role") + content: Any + if "content_repeat" in entry: + repeat = cast("dict[str, Any]", entry["content_repeat"]) + content = cast("str", repeat["char"]) * int(repeat["bytes"]) + else: + content = entry.get("content") + if role == "system": + out.append(SystemMessage(content=cast("str", content))) + elif role == "user": + out.append(UserMessage(content=cast("str", content))) + elif role == "assistant": + out.append(AssistantMessage(content=cast("str", content))) + elif role == "tool": + out.append( + ToolMessage(content=cast("str", content), tool_call_id=cast("str", entry["tool_call_id"])) + ) + else: + raise AssertionError(f"unknown message role: {role!r}") + return out + + +def _runtime_config_from_spec(config_spec: dict[str, Any] | None) -> RuntimeConfig | None: + if not config_spec: + return None + declared = { + "temperature", + "max_tokens", + "top_p", + "seed", + "frequency_penalty", + "presence_penalty", + "stop_sequences", + } + kwargs = {k: v for k, v in config_spec.items() if k in declared} + extras = cast("dict[str, Any]", config_spec.get("extras") or {}) + kwargs.update(extras) + return RuntimeConfig(**kwargs) + + +# --------------------------------------------------------------------------- +# Assertion helpers +# --------------------------------------------------------------------------- + + +def _assert_trace( + trace: LangfuseTrace, + expected: dict[str, Any], + *, + expected_invariants: dict[str, Any], +) -> None: + _assert_string_or_placeholder("trace.id", trace.id, expected.get("id")) + if "name" in expected: + _assert_string_or_placeholder("trace.name", trace.name, expected.get("name")) + expected_metadata = cast("dict[str, Any]", expected.get("metadata") or {}) + _assert_metadata_subset("trace.metadata", trace.metadata, expected_metadata) + expected_observations = cast("list[dict[str, Any]]", expected.get("observations") or []) + root_observations = trace.children_of(None) + _assert_observation_tree(trace, root_observations, expected_observations) + + # Invariants: cross-cutting checks that hold across the full Trace. + if expected_invariants.get("trace_id_equals_invocation_id"): + # No direct accessor for the invocation_id from outside the + # observer; the §8.4.1 contract is that trace.id == invocation_id, + # so the invariant degenerates to "trace.id matches the UUIDv4 + # pattern" — already asserted above via `` placeholder. + pass + if expected_invariants.get("correlation_id_consistency"): + trace_correlation = cast("str | None", trace.metadata.get("correlation_id")) + if trace_correlation is not None: + for obs in trace.observations: + obs_correlation = obs.metadata.get("correlation_id") + assert obs_correlation == trace_correlation, ( + f"correlation_id mismatch: trace={trace_correlation!r}, " + f"observation {obs.name!r}={obs_correlation!r}" + ) + + +def _assert_observation_tree( + trace: LangfuseTrace, + actual_children: list[LangfuseObservation], + expected_children: list[dict[str, Any]], +) -> None: + assert len(actual_children) == len(expected_children), ( + f"observation children count mismatch: expected {len(expected_children)}, got {len(actual_children)}" + ) + for actual, expected in zip(actual_children, expected_children, strict=True): + _assert_observation(trace, actual, expected) + + +def _assert_observation( + trace: LangfuseTrace, + actual: LangfuseObservation, + expected: dict[str, Any], +) -> None: + if "type" in expected: + assert actual.type == expected["type"], ( + f"observation {actual.name!r} type: expected {expected['type']!r}, got {actual.type!r}" + ) + if "name" in expected: + _assert_string_or_placeholder(f"observation[{actual.name}].name", actual.name, expected["name"]) + if "level" in expected: + assert actual.level == expected["level"], ( + f"observation {actual.name!r} level: expected {expected['level']!r}, got {actual.level!r}" + ) + if "model" in expected: + assert actual.model == expected["model"], ( + f"observation {actual.name!r} model: expected {expected['model']!r}, got {actual.model!r}" + ) + if "modelParameters" in expected: + expected_params = cast("dict[str, Any]", expected["modelParameters"]) + for key, value in expected_params.items(): + assert actual.model_parameters.get(key) == value, ( + f"observation {actual.name!r} modelParameters.{key}: " + f"expected {value!r}, got {actual.model_parameters.get(key)!r}" + ) + if "usage" in expected: + expected_usage = cast("dict[str, Any]", expected["usage"]) + assert actual.usage is not None, f"observation {actual.name!r} usage absent" + for key, value in expected_usage.items(): + actual_value = getattr(actual.usage, key, None) + assert actual_value == value, ( + f"observation {actual.name!r} usage.{key}: expected {value!r}, got {actual_value!r}" + ) + if "input_parses_as_messages" in expected: + expected_messages = cast("list[dict[str, Any]]", expected["input_parses_as_messages"]) + # input is the native message-list shape OR a JSON string of it; + # either way, parse-to-shape MUST succeed and match. + parsed = _parse_messages(actual.input) + # The actual messages are produced by the LLM provider's + # _serialize_messages_for_payload which carries the full §3 shape + # (including ``content`` as either str or block list). Loose + # subset compare on role+content for fixture parity. + for expected_msg, parsed_msg in zip(expected_messages, parsed, strict=False): + assert parsed_msg.get("role") == expected_msg["role"], ( + f"observation {actual.name!r} input message role mismatch: " + f"expected {expected_msg['role']!r}, got {parsed_msg.get('role')!r}" + ) + assert parsed_msg.get("content") == expected_msg["content"], ( + f"observation {actual.name!r} input message content mismatch: " + f"expected {expected_msg['content']!r}, got {parsed_msg.get('content')!r}" + ) + if expected.get("input_is_raw_string_with_marker") is True: + assert isinstance(actual.input, str), ( + f"observation {actual.name!r} input expected raw string, got {type(actual.input).__name__}" + ) + assert "[truncated," in actual.input, ( + f"observation {actual.name!r} input missing truncation marker: {actual.input!r}" + ) + if "output" in expected: + assert actual.output == expected["output"], ( + f"observation {actual.name!r} output: expected {expected['output']!r}, got {actual.output!r}" + ) + if "prompt_entity_link" in expected: + assert actual.prompt_entity_link == expected["prompt_entity_link"], ( + f"observation {actual.name!r} prompt_entity_link: " + f"expected {expected['prompt_entity_link']!r}, got {actual.prompt_entity_link!r}" + ) + if expected.get("prompt_entity_link_absent") is True: + assert actual.prompt_entity_link is None, ( + f"observation {actual.name!r} prompt_entity_link expected absent, " + f"got {actual.prompt_entity_link!r}" + ) + expected_metadata = cast("dict[str, Any]", expected.get("metadata") or {}) + _assert_metadata_subset(f"observation[{actual.name}].metadata", actual.metadata, expected_metadata) + + expected_children = cast("list[dict[str, Any]]", expected.get("children") or []) + actual_children = trace.children_of(actual.id) + _assert_observation_tree(trace, actual_children, expected_children) + + +def _parse_messages(value: Any) -> list[dict[str, Any]]: + if isinstance(value, list): + return cast("list[dict[str, Any]]", value) + if isinstance(value, str): + try: + decoded = json.loads(value) + except json.JSONDecodeError as exc: + raise AssertionError(f"input attribute did not parse as JSON: {value!r}") from exc + if isinstance(decoded, list): + return cast("list[dict[str, Any]]", decoded) + raise AssertionError(f"input attribute did not parse as a message list: {value!r}") + + +def _assert_metadata_subset( + label: str, + actual: Mapping[str, Any], + expected: Mapping[str, Any], +) -> None: + # Subset-compare: every key in `expected` MUST be present in + # `actual` with the corresponding value (or matching placeholder); + # additional keys in `actual` are tolerated. + for key, expected_value in expected.items(): + assert key in actual, f"{label}: missing key {key!r}; got keys {sorted(actual)}" + actual_value = actual[key] + if isinstance(expected_value, str) and ( + expected_value == "" or expected_value.startswith(" 0, ( + f"{label}.{key}: expected placeholder match, got {actual_value!r}" + ) + continue + if isinstance(expected_value, dict) and isinstance(actual_value, dict): + _assert_metadata_subset( + f"{label}.{key}", + cast("Mapping[str, Any]", actual_value), + cast("Mapping[str, Any]", expected_value), + ) + continue + assert actual_value == expected_value, ( + f"{label}.{key}: expected {expected_value!r}, got {actual_value!r}" + ) + + +def _assert_string_or_placeholder(label: str, actual: str | None, expected: Any) -> None: + if expected is None: + return + if isinstance(expected, str) and (expected == "" or expected == ""): + assert isinstance(actual, str) and len(actual) > 0, ( + f"{label}: expected non-empty string, got {actual!r}" + ) + return + assert actual == expected, f"{label}: expected {expected!r}, got {actual!r}" + + +# Unused helper kept for the SamplingConfig import to avoid a future +# F401 when 024's prompt-sampling path lands. +_ = SamplingConfig diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index 9f2f844..2c4386f 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -40,6 +40,7 @@ "07-multimodal-prompt", "08-checkpointing-and-migration", "09-tool-use", + "10-langfuse-observability", ] diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py new file mode 100644 index 0000000..57c7f8a --- /dev/null +++ b/tests/unit/test_observability_langfuse.py @@ -0,0 +1,116 @@ +"""Focused unit tests for the LangfuseObserver and InMemoryLangfuseClient. + +The conformance suite (``tests/conformance/test_observability_langfuse.py``) +exercises the end-to-end Trace + Observation shape against +spec/observability/conformance/022-024. These unit tests fill gaps +those fixtures don't isolate directly: payload-cap validation, +truncation algorithm boundaries, in-memory recorder field handling. +""" + +from __future__ import annotations + +import pytest + +from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObserver, + LangfuseUsage, +) + + +def test_observer_payload_cap_below_minimum_rejected() -> None: + # §5.5.5 minimum-cap mirror — 255 sits one byte below the spec + # minimum and MUST be rejected at construction time. + client = InMemoryLangfuseClient() + with pytest.raises(ValueError, match="below the spec §5.5.5 minimum"): + LangfuseObserver(client=client, payload_byte_cap=255) + + +def test_observer_payload_cap_at_minimum_accepted() -> None: + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, payload_byte_cap=256) + assert observer.payload_byte_cap == 256 + + +def test_in_memory_recorder_trace_create_then_update() -> None: + client = InMemoryLangfuseClient() + client.trace(id="t1", name="initial", metadata={"correlation_id": "c1"}) + client.update_trace(id="t1", name="renamed", metadata={"extra": "value"}) + + trace = client.traces["t1"] + assert trace.id == "t1" + assert trace.name == "renamed" + assert trace.metadata == {"correlation_id": "c1", "extra": "value"} + + +def test_in_memory_recorder_span_handle_update_and_end() -> None: + client = InMemoryLangfuseClient() + client.trace(id="t1") + handle = client.span(trace_id="t1", name="step", metadata={"k": 1}) + + handle.update(metadata={"extra": "v"}) + handle.end(level="ERROR", status_message="failed") + + trace = client.traces["t1"] + assert len(trace.observations) == 1 + obs = trace.observations[0] + assert obs.name == "step" + assert obs.ended is True + assert obs.level == "ERROR" + assert obs.status_message == "failed" + assert obs.metadata == {"k": 1, "extra": "v"} + + +def test_in_memory_recorder_generation_captures_native_fields() -> None: + client = InMemoryLangfuseClient() + client.trace(id="t1") + handle = client.generation( + trace_id="t1", + name="openarmature.llm.complete", + model="test-model", + model_parameters={"temperature": 0.7}, + input=[{"role": "user", "content": "hi"}], + output="hello back", + usage=LangfuseUsage(input=5, output=2, total=7), + prompt="lf-prompt-ref-1", + ) + handle.end(metadata={"finish_reason": "stop"}) + + trace = client.traces["t1"] + assert len(trace.observations) == 1 + obs = trace.observations[0] + assert obs.type == "generation" + assert obs.model == "test-model" + assert obs.model_parameters == {"temperature": 0.7} + assert obs.input == [{"role": "user", "content": "hi"}] + assert obs.output == "hello back" + assert obs.usage is not None + assert obs.usage.input == 5 + assert obs.usage.output == 2 + assert obs.usage.total == 7 + assert obs.prompt_entity_link == "lf-prompt-ref-1" + assert obs.metadata == {"finish_reason": "stop"} + + +def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None: + client = InMemoryLangfuseClient() + client.trace(id="t1") + a = client.span(trace_id="t1", name="a") + b = client.span(trace_id="t1", name="b") + assert a.id != b.id + + +def test_in_memory_recorder_children_of_walks_parent_links() -> None: + client = InMemoryLangfuseClient() + client.trace(id="t1") + root = client.span(trace_id="t1", name="root") + child = client.span(trace_id="t1", name="child", parent_observation_id=root.id) + other = client.span(trace_id="t1", name="other") + + trace = client.traces["t1"] + top_level = trace.children_of(None) + assert {o.name for o in top_level} == {"root", "other"} + root_children = trace.children_of(root.id) + assert [o.name for o in root_children] == ["child"] + # Unrelated observation not under root. + assert child.id != other.id From 4ae7c7d0e17267e14de4fae8f0bc532502282af4 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 08:58:18 -0700 Subject: [PATCH 2/2] Document deferred behavior in Langfuse observer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four doc-only additions from PR review: - _resolve_parent_observation_id now explicitly notes the longest-prefix-first outer-loop walk and calls out the first-match approximation when multiple open observations share a namespace prefix. The proper subgraph_observations / fan_out_instance_observations / detached_roots maps land in a follow-on PR; the current resolver covers the linear-graph and basic-LLM cases the v0.23.0 conformance fixtures exercise. - _maybe_truncate_for_input and _maybe_truncate_for_extras docstrings document the intentional list|str and dict|str union return types. When the serialized payload exceeds payload_byte_cap the helpers return the marker-bearing string verbatim per spec §8.7; the unparseable JSON IS the truncation signal and the Langfuse UI renders the string view in that case. - LangfuseClient.update_trace gets a comment explaining why it's declared in the Protocol despite the current observer not invoking it. The caller-supplied invocation-label path (proposal 0034, PR 4 of the v0.10.0 batch) may need to swap the trace name after the first node event opens the Trace; SDK adapters implement it for forward compatibility with that wiring. - Concepts and example docs now disclose that no specific langfuse SDK version is validated in CI for this release. A follow-on release pins a tested [langfuse] extras range and ships a runtime Protocol-conformance check; until then production wire-up is a "verify in your own environment" path. Behavior unchanged. AGENTS.md regenerated to pick up the example doc note. --- docs/concepts/observability.md | 13 ++++++ docs/examples/10-langfuse-observability.md | 9 ++++ .../observability/langfuse/client.py | 8 ++++ .../observability/langfuse/observer.py | 44 +++++++++++++------ 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 1b06549..38b3d0b 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -618,6 +618,19 @@ small adapter; see [`examples/10-langfuse-observability`](../examples/10-langfuse-observability.md) for the runnable demo plus the adapter shape. +!!! note "Langfuse SDK version compatibility" + + No specific `langfuse` SDK version is validated in CI as of this + release. The Protocol mirrors the SDK's documented low-level + `trace` / `span` / `generation` shape, but the SDK has shifted + between major versions (v2 → v3 introduced API changes). A + follow-on release pins a tested `[langfuse]` extras range and + ships a runtime `isinstance` check confirming the SDK satisfies + the Protocol. Until then, treat production wire-up as a "verify + in your own environment" path: bring the langfuse version your + stack already uses, run a smoke trace, and write a thin adapter + if any kwargs don't line up. + ### What Langfuse sees - **Trace ID = invocation ID.** The Trace's `id` is the OA diff --git a/docs/examples/10-langfuse-observability.md b/docs/examples/10-langfuse-observability.md index 0d42ca3..6bd00c9 100644 --- a/docs/examples/10-langfuse-observability.md +++ b/docs/examples/10-langfuse-observability.md @@ -135,6 +135,15 @@ small adapter class that implements `LangfuseClient` and delegates to the SDK call-by-call. The Protocol surface is narrow — four methods — so the adapter is on the order of 40 lines. +**No specific `langfuse` SDK version is validated in CI as of this +release.** The Protocol matches the SDK's documented low-level shape, +but `langfuse` has shifted between major versions (v2 → v3 introduced +API changes). A follow-on release pins a tested `[langfuse]` extras +range and a runtime `isinstance(client, LangfuseClient)` check; until +then, smoke-trace in your own environment with whichever `langfuse` +version your stack already uses and write a thin adapter if any +kwargs don't line up. + For prompt linkage: in production, the `Prompt.observability_entities['langfuse_prompt']` value is the SDK's own Prompt-entity object (returned by `langfuse_client.get_prompt(...)`) diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 8a55280..0685387 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -166,6 +166,14 @@ def trace( """ ... + # The current observer doesn't invoke this method — it sets the + # Trace's full metadata + entry-node-name fallback at creation. + # The Protocol still declares it because the caller-supplied + # invocation-label path (proposal 0034, PR 4) may need to swap + # the trace name AFTER the first node event opens the Trace; in + # that case the observer calls update_trace mid-invocation to + # apply the label. SDK adapters implement this for forward + # compatibility with PR 4's wiring. def update_trace( self, *, diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index d4dcd0d..520a70d 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -233,12 +233,24 @@ def _key_for(self, event: NodeEvent) -> _StackKey: return (event.namespace, event.attempt_index, event.fan_out_index) def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) -> str | None: - # Walk namespace ancestors looking for the innermost open - # observation; fall back to None (Trace becomes the parent). - # Subgraph dispatch / fan-out per-instance / detached-trace - # parenting are deferred from this version of the observer - # (no fixtures exercise them); future PRs add per-spec-§8.3 - # synthetic dispatch observations. + # Walk namespace ancestors longest-prefix-first looking for the + # innermost open observation; fall back to None (Trace becomes + # the parent). The outer loop counts down from len(namespace)-1 + # so the deepest matching ancestor wins. + # + # First-match-by-iteration is approximate when multiple open + # observations share the same namespace prefix at different + # _StackKey slots (multiple retry attempts, multiple fan-out + # instances). In practice retry middleware ends one attempt + # before opening the next, so concurrent same-namespace + # observations only arise under fan-out. Spec §8.3 mandates + # dedicated dispatch Span observations for subgraphs and + # per-instance fan-out spans; those land in a follow-on PR + # alongside dedicated subgraph_observations / + # fan_out_instance_observations maps mirroring the OTel + # observer's structure. Until then this resolver covers the + # linear-graph and basic-LLM cases the v0.23.0 conformance + # fixtures exercise. for prefix_len in range(len(event.namespace) - 1, 0, -1): prefix = event.namespace[:prefix_len] for key, observation in inv_state.open_observations.items(): @@ -461,11 +473,14 @@ def _resolve_prompt_link(self, payload: LlmEventPayload) -> Any: return cast("dict[str, Any]", entities).get("langfuse_prompt") def _maybe_truncate_for_input(self, value: Any) -> Any: - # Returns the native value when it fits the cap, or the - # truncated string-with-marker when it doesn't. §8.7's - # "raw truncated string" rule: the unparseable JSON IS the - # truncation signal, surfacing the marker rather than faking - # a parse. + # Returns the native value (list of message dicts) when it + # fits the cap, or the truncated marker-bearing string when + # it doesn't. The list-or-str union return is intentional per + # spec §8.7: the unparseable JSON IS the truncation signal — + # surfacing the marker preserves the diagnostic without + # faking a parse, and the Langfuse UI renders the string view + # rather than the structured-input view. Callers MUST NOT + # assume the return value is JSON-parseable. serialized = self._serialize_payload_value(value) truncated = _truncate(serialized, self.payload_byte_cap) if truncated is None: @@ -480,8 +495,11 @@ def _maybe_truncate_for_output(self, value: str) -> str: def _maybe_truncate_for_extras(self, value: dict[str, Any]) -> Any: # request_extras goes on metadata as a native dict when it - # fits; falls through to the raw truncated string when it - # doesn't, matching §8.7's parse-fallthrough story. + # fits, or the truncated marker-bearing string when it + # doesn't. The dict-or-str union return mirrors + # _maybe_truncate_for_input's intentional shape per spec §8.7: + # the unparseable JSON IS the truncation signal, and the + # Langfuse UI renders the string view in that case. serialized = self._serialize_payload_value(value) truncated = _truncate(serialized, self.payload_byte_cap) if truncated is None: