Skip to content

feat(telemetry): CloudEvents + Langfuse tracing as projections over a captured LLM stream#845

Open
akattelu wants to merge 3 commits into
mainfrom
aakash/cloudevents-tracing
Open

feat(telemetry): CloudEvents + Langfuse tracing as projections over a captured LLM stream#845
akattelu wants to merge 3 commits into
mainfrom
aakash/cloudevents-tracing

Conversation

@akattelu

@akattelu akattelu commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

What

LLM call tracing where each call is captured once (CapturedLLMCall) and fanned out to multiple exporters — "one data model, two projections":

  • CloudEvents trace stream (llm.call.traced / trace.content) — feeds the parquet / ML substrate.
  • Langfuse (LangfuseExporter) — reconstructs trace → run → step → generation from the same captured stream.

Highlights

  • Capture seam (src/llm/capture.py): single canonicalization + content-addressed hashing point; O(N) per-span memo so repeated context isn't re-hashed/re-clipped on the hot path.
  • Session correlation: session_id (the opaque Session.id, namespaced only at the Langfuse boundary) threaded telemetry → captured call → exporters.
  • Span identity consolidated onto LLMTelemetryContext; dropped the TRACE_ENDPOINT knob.
  • Canonical generation/step names; dreamer branches nest under one dream trace; tool calls become spans under their step.
  • Dedup registries (trace_session, langfuse_session) bounded by an LRU so dedup/grouping survive long-running workers.
  • Embedding-call tracing; deterministic high-volume sampling.

Config / rollout

  • LANGFUSE_EXPORTER_MODE — default exporter; inline keeps the legacy live @observe path one release for side-by-side validation (the env-flip rollback).
  • Code defaults stay off (TELEMETRY_ENABLED / TELEMETRY_TRACE_PAYLOADS = false) so self-hosted doesn't auto-emit. Prod enables tracing by setting TELEMETRY_TRACE_PAYLOADS=true (telemetry endpoint already configured for billing).

Testing

Added unit tests and verified unified test e2e runs in langfuse

Summary by CodeRabbit

  • New Features

    • Added opt-in full-fidelity telemetry payload tracing for LLM and embedding activity, including purpose allowlisting and payload size limits.
    • Introduced replay-grade captured-call trace exporting (including Langfuse projection).
    • Added trace-viewer configuration and a Langfuse exporter mode switch (inline vs exporter).
  • Bug Fixes

    • Improved end-to-end trace/span/session correlation across single-shot, agentic, tools, and streamed results.
    • Reduced trace-content duplication and hardened tracing so export failures don’t affect LLM execution.
  • Tests

    • Expanded coverage for payload capture, hashing/dedup, trace emission, Langfuse nesting, and cross-agent trace semantics.

@coderabbitai

coderabbitai Bot commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

Adds full-fidelity telemetry tracing for LLM, embedding, and summary calls. The PR introduces captured-call export paths, trace-content deduplication, Langfuse span reuse and projection, and new settings plus call-site updates for trace, span, session, and step correlation.

Changes

Telemetry tracing pipeline

Layer / File(s) Summary
Tracing settings and startup gating
.env.template, src/config.py, src/telemetry/emitter.py, src/telemetry/events/__init__.py, src/telemetry/logging.py, src/utils/agent_tools.py, tests/telemetry/test_emit_function.py, tests/utils/test_clients.py
Payload-tracing settings, trace-emitter startup, and inline/exporter gating are wired through config, telemetry initialization, and tool observation helpers.
Captured call model and streaming finalization
src/llm/types.py, src/llm/capture.py, tests/llm/test_capture.py
LLMTelemetryContext and the capture helpers add span-tree fields, canonical hashing/clipping, captured-call records, exporter registry helpers, and streamed capture finalization.
Span and session identity propagation
src/deriver/deriver.py, src/dialectic/chat.py, src/dialectic/core.py, src/dreamer/specialists.py, src/embedding_client.py, src/llm/runtime.py, src/utils/summarizer.py, src/utils/types.py, tests/telemetry/test_cross_agent_trace.py, tests/telemetry/test_embedding_trace.py, tests/llm/test_langfuse_trace_annotation.py
Agent, embedding, and summary call sites mint or pass trace_id, span_id, parent_span_id, and session_id, and Langfuse annotation now keys off parent_span_id and span_identity.
Executor and tool-loop capture dispatch
src/llm/executor.py, src/llm/tool_loop.py, tests/llm/test_telemetry_agent_iteration.py
honcho_llm_call_inner and execute_tool_loop build step_seq-aware telemetry copies, seed hash memoization, and dispatch captured calls after completion or streamed drain.
Trace payload events, dedup, and export
src/telemetry/events/trace.py, src/telemetry/trace_session.py, src/telemetry/trace_exporter.py, tests/telemetry/conftest.py, tests/telemetry/test_trace_events.py
Trace payload event types, per-run content deduplication, and the CloudEvents trace exporter now emit unique trace.content and llm.call.traced records.
Langfuse trace projection
src/telemetry/langfuse_session.py, src/telemetry/langfuse_exporter.py, tests/telemetry/test_cross_agent_trace.py, tests/telemetry/test_langfuse_exporter.py
Langfuse session reuse and exporter code project captured calls into trace, run, step, generation, and tool observations, including Dream-root handling.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90+ minutes

Possibly related PRs

  • plastic-labs/honcho#773: Shares src/dialectic/chat.py session handling around tracked_db(..., read_only=True) and DialecticAgent invocation.
  • plastic-labs/honcho#814: Shares Langfuse runtime and trace-attribution changes in src/llm/runtime.py and related span/session handling.
  • plastic-labs/honcho#849: Shares Langfuse generation-span instrumentation and inline/exporter gating in src/llm/runtime.py, src/telemetry/logging.py, and related call sites.

Suggested reviewers

  • VVoruganti

Poem

I hopped through traces by moonlit byte,
With spans and hashes tucked in tight.
One burrowed call, then many a gleam,
All stitched together in a tracey stream. 🐇
The carrot logs now sparkle bright.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.63% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: projecting a captured LLM stream into CloudEvents and Langfuse tracing.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch aakash/cloudevents-tracing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@akattelu akattelu force-pushed the aakash/cloudevents-tracing branch from 70b9119 to 5faa210 Compare June 25, 2026 16:43
@akattelu akattelu marked this pull request as ready for review June 25, 2026 16:49

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
src/telemetry/events/__init__.py (1)

192-201: 🩺 Stability & Availability | 🔵 Trivial | 💤 Low value

Preserve the exception detail in emit_trace.

Unlike emit() (which captures to Sentry and logs the error), this handler logs only the event type name and drops the actual exception. When the trace path misbehaves, there's no breadcrumb to diagnose it. Consider exc_info=True so the swallowed error is still recoverable from debug logs.

♻️ Suggested tweak
-    except Exception:  # pragma: no cover - best-effort telemetry
-        logger.debug("Failed to emit trace event %s", type(event).__name__)
+    except Exception:  # pragma: no cover - best-effort telemetry
+        logger.debug(
+            "Failed to emit trace event %s", type(event).__name__, exc_info=True
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/telemetry/events/__init__.py` around lines 192 - 201, `emit_trace` is
swallowing the underlying exception and only logging the event type, which
removes useful debug context. Update the exception handler in
`src.telemetry.events.__init__.py` so the `except Exception` block preserves the
error details, ideally by logging with `exc_info=True` alongside the existing
message. Keep the `get_trace_emitter()` / `emitter.emit(event)` flow unchanged,
and make sure the fallback debug log still identifies the event via
`type(event).__name__`.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/dreamer/specialists.py`:
- Around line 295-299: The specialist tracing in the run/span setup is reusing
the shared dream trace id as the span id, which makes deduction and induction
runs indistinguishable when they share parent_run_id. Update the span creation
logic in the specialist run path so trace_id stays set to run_id, but span_id is
minted uniquely per specialist execution whenever parent_run_id is present. Use
the surrounding run/span construction in src/dreamer/specialists.py to locate
the change and keep the root span behavior intact while ensuring each specialist
gets a distinct span_id.

In `@src/llm/capture.py`:
- Around line 74-92: `clip_for_trace` currently skips clipping for non-string
content, so structured `CapturedMessage.content` (lists/dicts/tool results) can
still exceed `TRACE_MAX_BYTES`. Update `clip_for_trace` in `src/llm/capture.py`
to enforce the byte cap for structured payloads as well, either by recursively
clipping string leaves or by safely capping the serialized representation before
hashing/exporting. Keep the existing behavior and signature of `clip_for_trace`
while ensuring all content types returned from `CapturedMessage.content` are
bounded.

In `@src/llm/tool_loop.py`:
- Around line 526-528: The no-tool early-return branch in tool_loop.py reuses
the same step_seq for both the non-stream call and the streamed tail call,
causing identical trace resource ids and deduplication collisions. Update the
stream_telemetry setup in the early-return path so the tail call gets a
distinct, next step_seq value consistent with the LLMTelemetryContext contract
and the synthesis branch behavior. Use the existing _telemetry_for_iteration
helper and the nearby _call_with_messages flow as the reference points when
adjusting the sequence value.

In `@src/llm/types.py`:
- Around line 137-149: The exported_parent_span_id normalization in types.py
only checks parent_span_id against span_id, so legacy or run-only cases where
parent_span_id matches run_id can still export a self-parent instead of None.
Update exported_parent_span_id to use the same identity fallback logic as
span_identity(), comparing against both span_id and run_id as needed, and keep
the self-parent collapse behavior returning None for exported traces.

In `@src/telemetry/events/trace.py`:
- Around line 146-147: EmbeddingCallTracedEvent.get_resource_id() is too coarse
and can collide because it only uses span_id, call_purpose, and input_count.
Update the resource key logic in EmbeddingCallTracedEvent to include iteration
and step_seq, following the LLMCallTracedEvent pattern, and make sure the
EmbeddingCallTracedEvent instantiation path in embedding_client.py provides
those values so distinct embedding steps in the same span do not dedupe
incorrectly.

In `@tests/llm/test_capture.py`:
- Around line 109-116: The test_clips_oversized_string assertion is too
permissive because it allows TRACE_MAX_BYTES plus the truncation marker length,
so it won’t catch outputs that exceed the configured cap. Update the final
assert in test_clips_oversized_string to verify clip_for_trace returns content
whose UTF-8 byte length stays within settings.TELEMETRY.TRACE_MAX_BYTES, while
still checking the truncated flag and marker suffix.

---

Nitpick comments:
In `@src/telemetry/events/__init__.py`:
- Around line 192-201: `emit_trace` is swallowing the underlying exception and
only logging the event type, which removes useful debug context. Update the
exception handler in `src.telemetry.events.__init__.py` so the `except
Exception` block preserves the error details, ideally by logging with
`exc_info=True` alongside the existing message. Keep the `get_trace_emitter()` /
`emitter.emit(event)` flow unchanged, and make sure the fallback debug log still
identifies the event via `type(event).__name__`.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8f3d8214-c1b4-4947-a0b6-d9bbb3e8447f

📥 Commits

Reviewing files that changed from the base of the PR and between 60a15e6 and 5faa210.

📒 Files selected for processing (33)
  • .env.template
  • src/config.py
  • src/deriver/deriver.py
  • src/dialectic/chat.py
  • src/dialectic/core.py
  • src/dreamer/specialists.py
  • src/embedding_client.py
  • src/llm/capture.py
  • src/llm/executor.py
  • src/llm/runtime.py
  • src/llm/tool_loop.py
  • src/llm/types.py
  • src/telemetry/emitter.py
  • src/telemetry/events/__init__.py
  • src/telemetry/events/trace.py
  • src/telemetry/langfuse_exporter.py
  • src/telemetry/langfuse_session.py
  • src/telemetry/logging.py
  • src/telemetry/trace_exporter.py
  • src/telemetry/trace_session.py
  • src/utils/agent_tools.py
  • src/utils/summarizer.py
  • src/utils/types.py
  • tests/llm/test_capture.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/llm/test_telemetry_agent_iteration.py
  • tests/telemetry/conftest.py
  • tests/telemetry/test_cross_agent_trace.py
  • tests/telemetry/test_embedding_trace.py
  • tests/telemetry/test_emit_function.py
  • tests/telemetry/test_langfuse_exporter.py
  • tests/telemetry/test_trace_events.py
  • tests/utils/test_clients.py

Comment thread src/dreamer/specialists.py Outdated
Comment thread src/llm/capture.py
Comment thread src/llm/tool_loop.py
Comment thread src/llm/types.py
Comment thread src/telemetry/events/trace.py
Comment thread tests/llm/test_capture.py Outdated
akattelu and others added 2 commits June 26, 2026 14:43
… captured LLM stream

Capture each LLM call once (CapturedLLMCall) and fan it out to multiple
exporters -- "one data model, two projections": a CloudEvents trace stream
(llm.call.traced / trace.content) and a Langfuse projection, both reconstructing
trace -> run -> step -> generation from the same source of truth.

- Capture seam (src/llm/capture.py): one canonicalization + content-addressed
  hashing point, with an O(N) per-span memo so repeated context isn't re-hashed.
- Session correlation threaded telemetry -> captured call -> exporters,
  namespaced only at the Langfuse export boundary.
- Span identity consolidated onto LLMTelemetryContext; dropped TRACE_ENDPOINT.
- Canonical generation/step names; dreamer branches nest under one dream trace;
  tool calls become spans under their step.
- LANGFUSE_EXPORTER_MODE toggle ("exporter" default; "inline" kept one release
  for side-by-side validation), centralized into computed settings predicates.
- Per-run/per-trace dedup registries (trace_session, langfuse_session) bounded
  by an LRU so dedup and span grouping survive long-running workers.
- Embedding-call tracing; deterministic high-volume event sampling.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ns, test, logging)

- Dreamer specialists mint a distinct span_id per execution (trace_id stays the
  shared dream run_id), so their CloudEvents trace resource ids no longer collide
  between deduction and induction.
- Tool-loop no-tool early-return streams the tail with the next ordinal
  (iteration+2) instead of reusing the in-loop call's step_seq, avoiding a
  colliding trace resource id; mirrors the synthesis path.
- Tighten test_clips_oversized_string to assert output stays within TRACE_MAX_BYTES.
- emit_trace logs the swallowed exception with exc_info for debuggability.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@akattelu akattelu force-pushed the aakash/cloudevents-tracing branch from 2cb9d67 to 80341c6 Compare June 26, 2026 18:50

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/llm/types.py (1)

283-324: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Preserve the provider stream finish reason.

A normally drained stream is always finalized as "stop", even if the final chunk reported another finish reason such as "length". Track the last non-empty chunk finish reason and pass that to the capture finalizer.

Proposed fix
         accumulated_text: list[str] = []
         stream_error: BaseException | None = None
+        stream_finish_reason: str | None = None
         try:
             async for chunk in self._stream:
                 if chunk.output_tokens is not None:
@@
                 if accumulate and chunk.content:
                     accumulated_text.append(chunk.content)
+                chunk_finish_reason = getattr(chunk, "finish_reason", None)
+                if chunk_finish_reason:
+                    stream_finish_reason = chunk_finish_reason
                 yield chunk
@@
                 finish_reason = (
-                    "stop"
+                    stream_finish_reason or "stop"
                     if stream_error is None
                     else (
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/llm/types.py` around lines 283 - 324, The stream finalization in the
async iterator is overwriting the provider’s reported finish reason with "stop"
whenever the stream drains normally. Update the streaming logic in the iterator
method to remember the last non-empty chunk finish reason while iterating over
self._stream, then use that preserved value when calling the capture finalizer
in the finally block instead of defaulting to "stop". Keep the existing
error/cancel handling paths intact, and make sure the finish reason passed
through the finalizer reflects the provider’s actual terminal reason when
available.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/config.py`:
- Around line 1401-1415: The langfuse_exporter_enabled property can report
enabled even when no call data is captured, so update the telemetry config to
keep it consistent with honcho_llm_call_inner and initialize_telemetry_events.
Either include TELEMETRY.TRACE_PAYLOADS in langfuse_exporter_enabled alongside
LANGFUSE_PUBLIC_KEY and LANGFUSE_EXPORTER_MODE, or add a model validator in
Config to reject the exporter mode when trace payload capture is off.
- Around line 1189-1192: Add settings-time validation for TRACE_PURPOSES in the
config model so invalid purpose strings fail fast instead of silently disabling
tracing. Update the settings class in src/config.py to validate the list against
CallPurpose when loading, using the existing TRACE_PURPOSES field and the same
validation pattern used for LLMTelemetryContext.call_purpose. Reject unknown
values with a clear error before TraceExporter.export() ever uses the allowlist.

In `@src/llm/executor.py`:
- Around line 468-486: The inline Langfuse generation IO call is still gated
only by LANGFUSE_PUBLIC_KEY, so exporter mode still enters the legacy annotate
path and builds the payload unnecessarily. Update the gate in the executor flow
around annotate_current_generation_io and _langfuse_model_parameters to use
langfuse_inline_enabled so inline generation IO only runs when inline mode is
actually enabled, preventing duplicate updates and the extra model_dump-backed
cost.

In `@src/llm/runtime.py`:
- Around line 124-140: The inline Langfuse update helper still runs whenever
LANGFUSE_PUBLIC_KEY is set, even in exporter mode. Update this helper to use the
same mode-aware gate as annotate_current_langfuse_trace so exporter-mode calls
do not reach the legacy get_client().update_current_generation path. Keep the
existing payload assembly, but short-circuit before the Langfuse client call
when exporter mode is active.

In `@src/telemetry/events/__init__.py`:
- Around line 218-228: Duplicate Langfuse exporter registration can happen when
telemetry is initialized more than once, because the current
`register_exporter()` call in `src.telemetry.events.__init__` creates a fresh
`LangfuseExporter()` each time and dedupes only by object identity. Update the
initialization path around the `settings.langfuse_exporter_enabled` block to
guard against repeated registration by reusing a single exporter instance or
checking whether `LangfuseExporter` is already registered before calling
`register_exporter()`. Make the same change in the other duplicated
initialization path noted by the comment so repeated telemetry setup does not
add multiple exporters.

---

Outside diff comments:
In `@src/llm/types.py`:
- Around line 283-324: The stream finalization in the async iterator is
overwriting the provider’s reported finish reason with "stop" whenever the
stream drains normally. Update the streaming logic in the iterator method to
remember the last non-empty chunk finish reason while iterating over
self._stream, then use that preserved value when calling the capture finalizer
in the finally block instead of defaulting to "stop". Keep the existing
error/cancel handling paths intact, and make sure the finish reason passed
through the finalizer reflects the provider’s actual terminal reason when
available.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 165c1f5a-a77e-481b-b15e-60ba971597cc

📥 Commits

Reviewing files that changed from the base of the PR and between 2cb9d67 and 80341c6.

📒 Files selected for processing (33)
  • .env.template
  • src/config.py
  • src/deriver/deriver.py
  • src/dialectic/chat.py
  • src/dialectic/core.py
  • src/dreamer/specialists.py
  • src/embedding_client.py
  • src/llm/capture.py
  • src/llm/executor.py
  • src/llm/runtime.py
  • src/llm/tool_loop.py
  • src/llm/types.py
  • src/telemetry/emitter.py
  • src/telemetry/events/__init__.py
  • src/telemetry/events/trace.py
  • src/telemetry/langfuse_exporter.py
  • src/telemetry/langfuse_session.py
  • src/telemetry/logging.py
  • src/telemetry/trace_exporter.py
  • src/telemetry/trace_session.py
  • src/utils/agent_tools.py
  • src/utils/summarizer.py
  • src/utils/types.py
  • tests/llm/test_capture.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/llm/test_telemetry_agent_iteration.py
  • tests/telemetry/conftest.py
  • tests/telemetry/test_cross_agent_trace.py
  • tests/telemetry/test_embedding_trace.py
  • tests/telemetry/test_emit_function.py
  • tests/telemetry/test_langfuse_exporter.py
  • tests/telemetry/test_trace_events.py
  • tests/utils/test_clients.py
✅ Files skipped from review due to trivial changes (3)
  • tests/utils/test_clients.py
  • .env.template
  • tests/telemetry/test_emit_function.py
🚧 Files skipped from review as they are similar to previous changes (21)
  • src/dreamer/specialists.py
  • tests/telemetry/test_embedding_trace.py
  • src/embedding_client.py
  • src/telemetry/logging.py
  • src/utils/agent_tools.py
  • src/deriver/deriver.py
  • tests/llm/test_telemetry_agent_iteration.py
  • src/telemetry/langfuse_session.py
  • src/utils/types.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/telemetry/test_cross_agent_trace.py
  • src/dialectic/core.py
  • src/dialectic/chat.py
  • src/telemetry/trace_exporter.py
  • tests/llm/test_capture.py
  • src/utils/summarizer.py
  • src/telemetry/events/trace.py
  • tests/telemetry/test_trace_events.py
  • src/telemetry/emitter.py
  • src/llm/tool_loop.py
  • tests/telemetry/conftest.py

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/llm/types.py (1)

283-324: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Preserve the provider stream finish reason.

A normally drained stream is always finalized as "stop", even if the final chunk reported another finish reason such as "length". Track the last non-empty chunk finish reason and pass that to the capture finalizer.

Proposed fix
         accumulated_text: list[str] = []
         stream_error: BaseException | None = None
+        stream_finish_reason: str | None = None
         try:
             async for chunk in self._stream:
                 if chunk.output_tokens is not None:
@@
                 if accumulate and chunk.content:
                     accumulated_text.append(chunk.content)
+                chunk_finish_reason = getattr(chunk, "finish_reason", None)
+                if chunk_finish_reason:
+                    stream_finish_reason = chunk_finish_reason
                 yield chunk
@@
                 finish_reason = (
-                    "stop"
+                    stream_finish_reason or "stop"
                     if stream_error is None
                     else (
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/llm/types.py` around lines 283 - 324, The stream finalization in the
async iterator is overwriting the provider’s reported finish reason with "stop"
whenever the stream drains normally. Update the streaming logic in the iterator
method to remember the last non-empty chunk finish reason while iterating over
self._stream, then use that preserved value when calling the capture finalizer
in the finally block instead of defaulting to "stop". Keep the existing
error/cancel handling paths intact, and make sure the finish reason passed
through the finalizer reflects the provider’s actual terminal reason when
available.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/config.py`:
- Around line 1401-1415: The langfuse_exporter_enabled property can report
enabled even when no call data is captured, so update the telemetry config to
keep it consistent with honcho_llm_call_inner and initialize_telemetry_events.
Either include TELEMETRY.TRACE_PAYLOADS in langfuse_exporter_enabled alongside
LANGFUSE_PUBLIC_KEY and LANGFUSE_EXPORTER_MODE, or add a model validator in
Config to reject the exporter mode when trace payload capture is off.
- Around line 1189-1192: Add settings-time validation for TRACE_PURPOSES in the
config model so invalid purpose strings fail fast instead of silently disabling
tracing. Update the settings class in src/config.py to validate the list against
CallPurpose when loading, using the existing TRACE_PURPOSES field and the same
validation pattern used for LLMTelemetryContext.call_purpose. Reject unknown
values with a clear error before TraceExporter.export() ever uses the allowlist.

In `@src/llm/executor.py`:
- Around line 468-486: The inline Langfuse generation IO call is still gated
only by LANGFUSE_PUBLIC_KEY, so exporter mode still enters the legacy annotate
path and builds the payload unnecessarily. Update the gate in the executor flow
around annotate_current_generation_io and _langfuse_model_parameters to use
langfuse_inline_enabled so inline generation IO only runs when inline mode is
actually enabled, preventing duplicate updates and the extra model_dump-backed
cost.

In `@src/llm/runtime.py`:
- Around line 124-140: The inline Langfuse update helper still runs whenever
LANGFUSE_PUBLIC_KEY is set, even in exporter mode. Update this helper to use the
same mode-aware gate as annotate_current_langfuse_trace so exporter-mode calls
do not reach the legacy get_client().update_current_generation path. Keep the
existing payload assembly, but short-circuit before the Langfuse client call
when exporter mode is active.

In `@src/telemetry/events/__init__.py`:
- Around line 218-228: Duplicate Langfuse exporter registration can happen when
telemetry is initialized more than once, because the current
`register_exporter()` call in `src.telemetry.events.__init__` creates a fresh
`LangfuseExporter()` each time and dedupes only by object identity. Update the
initialization path around the `settings.langfuse_exporter_enabled` block to
guard against repeated registration by reusing a single exporter instance or
checking whether `LangfuseExporter` is already registered before calling
`register_exporter()`. Make the same change in the other duplicated
initialization path noted by the comment so repeated telemetry setup does not
add multiple exporters.

---

Outside diff comments:
In `@src/llm/types.py`:
- Around line 283-324: The stream finalization in the async iterator is
overwriting the provider’s reported finish reason with "stop" whenever the
stream drains normally. Update the streaming logic in the iterator method to
remember the last non-empty chunk finish reason while iterating over
self._stream, then use that preserved value when calling the capture finalizer
in the finally block instead of defaulting to "stop". Keep the existing
error/cancel handling paths intact, and make sure the finish reason passed
through the finalizer reflects the provider’s actual terminal reason when
available.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 165c1f5a-a77e-481b-b15e-60ba971597cc

📥 Commits

Reviewing files that changed from the base of the PR and between 2cb9d67 and 80341c6.

📒 Files selected for processing (33)
  • .env.template
  • src/config.py
  • src/deriver/deriver.py
  • src/dialectic/chat.py
  • src/dialectic/core.py
  • src/dreamer/specialists.py
  • src/embedding_client.py
  • src/llm/capture.py
  • src/llm/executor.py
  • src/llm/runtime.py
  • src/llm/tool_loop.py
  • src/llm/types.py
  • src/telemetry/emitter.py
  • src/telemetry/events/__init__.py
  • src/telemetry/events/trace.py
  • src/telemetry/langfuse_exporter.py
  • src/telemetry/langfuse_session.py
  • src/telemetry/logging.py
  • src/telemetry/trace_exporter.py
  • src/telemetry/trace_session.py
  • src/utils/agent_tools.py
  • src/utils/summarizer.py
  • src/utils/types.py
  • tests/llm/test_capture.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/llm/test_telemetry_agent_iteration.py
  • tests/telemetry/conftest.py
  • tests/telemetry/test_cross_agent_trace.py
  • tests/telemetry/test_embedding_trace.py
  • tests/telemetry/test_emit_function.py
  • tests/telemetry/test_langfuse_exporter.py
  • tests/telemetry/test_trace_events.py
  • tests/utils/test_clients.py
✅ Files skipped from review due to trivial changes (3)
  • tests/utils/test_clients.py
  • .env.template
  • tests/telemetry/test_emit_function.py
🚧 Files skipped from review as they are similar to previous changes (21)
  • src/dreamer/specialists.py
  • tests/telemetry/test_embedding_trace.py
  • src/embedding_client.py
  • src/telemetry/logging.py
  • src/utils/agent_tools.py
  • src/deriver/deriver.py
  • tests/llm/test_telemetry_agent_iteration.py
  • src/telemetry/langfuse_session.py
  • src/utils/types.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/telemetry/test_cross_agent_trace.py
  • src/dialectic/core.py
  • src/dialectic/chat.py
  • src/telemetry/trace_exporter.py
  • tests/llm/test_capture.py
  • src/utils/summarizer.py
  • src/telemetry/events/trace.py
  • tests/telemetry/test_trace_events.py
  • src/telemetry/emitter.py
  • src/llm/tool_loop.py
  • tests/telemetry/conftest.py
🛑 Comments failed to post (5)
src/config.py (2)

1189-1192: 🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Validate TRACE_PURPOSES at settings load.

TraceExporter.export() treats this as an exact allowlist, so a typo here silently suppresses tracing instead of failing fast. Add a validator that rejects unknown purpose strings up front.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/config.py` around lines 1189 - 1192, Add settings-time validation for
TRACE_PURPOSES in the config model so invalid purpose strings fail fast instead
of silently disabling tracing. Update the settings class in src/config.py to
validate the list against CallPurpose when loading, using the existing
TRACE_PURPOSES field and the same validation pattern used for
LLMTelemetryContext.call_purpose. Reject unknown values with a clear error
before TraceExporter.export() ever uses the allowlist.

1401-1415: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

langfuse_exporter_enabled can be true while the exporter gets no data.

initialize_telemetry_events() registers the Langfuse exporter from this property, but honcho_llm_call_inner() skips capture dispatch when TELEMETRY.TRACE_PAYLOADS is false. With the new defaults, Langfuse looks enabled yet never receives captured calls. Gate this on TELEMETRY.TRACE_PAYLOADS too, or reject the incompatible config in a model validator.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/config.py` around lines 1401 - 1415, The langfuse_exporter_enabled
property can report enabled even when no call data is captured, so update the
telemetry config to keep it consistent with honcho_llm_call_inner and
initialize_telemetry_events. Either include TELEMETRY.TRACE_PAYLOADS in
langfuse_exporter_enabled alongside LANGFUSE_PUBLIC_KEY and
LANGFUSE_EXPORTER_MODE, or add a model validator in Config to reject the
exporter mode when trace payload capture is off.
src/llm/executor.py (1)

468-486: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Gate inline generation IO on langfuse_inline_enabled.

In exporter mode, LANGFUSE_PUBLIC_KEY is still set, so these branches still build and call the legacy inline generation IO path. Use the mode-aware gate to avoid duplicate/stray Langfuse updates and the model_dump payload cost when LANGFUSE_EXPORTER_MODE=exporter.

Proposed fix
-    if settings.LANGFUSE_PUBLIC_KEY:
+    if settings.langfuse_inline_enabled:
         annotate_current_generation_io(
             input=messages,
             model_parameters=_langfuse_model_parameters(
@@
-        if settings.LANGFUSE_PUBLIC_KEY:
+        if settings.langfuse_inline_enabled:
             annotate_current_generation_io(
                 output=response,
                 usage_details=_langfuse_usage_details(response),

Also applies to: 572-581

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/llm/executor.py` around lines 468 - 486, The inline Langfuse generation
IO call is still gated only by LANGFUSE_PUBLIC_KEY, so exporter mode still
enters the legacy annotate path and builds the payload unnecessarily. Update the
gate in the executor flow around annotate_current_generation_io and
_langfuse_model_parameters to use langfuse_inline_enabled so inline generation
IO only runs when inline mode is actually enabled, preventing duplicate updates
and the extra model_dump-backed cost.
src/llm/runtime.py (1)

124-140: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Use the mode-aware inline gate here too.

This helper remains active whenever a Langfuse public key exists, even when exporter mode is selected. Make it consistent with annotate_current_langfuse_trace so direct callers cannot hit the legacy inline path in exporter mode.

Proposed fix
-    if not settings.LANGFUSE_PUBLIC_KEY:
+    if not settings.langfuse_inline_enabled:
         return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

    if not settings.langfuse_inline_enabled:
        return
    payload: dict[str, Any] = {}
    if input is not None:
        payload["input"] = input
    if output is not None:
        payload["output"] = output
    if model_parameters:
        payload["model_parameters"] = model_parameters
    if usage_details:
        payload["usage_details"] = usage_details
    if not payload:
        return
    try:
        from langfuse import get_client

        get_client().update_current_generation(**payload)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/llm/runtime.py` around lines 124 - 140, The inline Langfuse update helper
still runs whenever LANGFUSE_PUBLIC_KEY is set, even in exporter mode. Update
this helper to use the same mode-aware gate as annotate_current_langfuse_trace
so exporter-mode calls do not reach the legacy
get_client().update_current_generation path. Keep the existing payload assembly,
but short-circuit before the Langfuse client call when exporter mode is active.
src/telemetry/events/__init__.py (1)

218-228: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Avoid duplicate exporter registration on repeated initialization.

Each call constructs new exporter instances, while register_exporter() only dedupes by object identity. Re-initializing telemetry can register duplicates and export the same captured call multiple times.

Proposed fix
 async def initialize_telemetry_events() -> None:
@@
     from src.config import settings
+    from src.llm.capture import clear_exporters
     from src.telemetry.emitter import initialize_emitter
 
+    clear_exporters()
+
@@
     if settings.langfuse_exporter_enabled:
-        from src.llm.capture import register_exporter
+        from src.llm.capture import register_exporter

Also applies to: 249-265

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/telemetry/events/__init__.py` around lines 218 - 228, Duplicate Langfuse
exporter registration can happen when telemetry is initialized more than once,
because the current `register_exporter()` call in
`src.telemetry.events.__init__` creates a fresh `LangfuseExporter()` each time
and dedupes only by object identity. Update the initialization path around the
`settings.langfuse_exporter_enabled` block to guard against repeated
registration by reusing a single exporter instance or checking whether
`LangfuseExporter` is already registered before calling `register_exporter()`.
Make the same change in the other duplicated initialization path noted by the
comment so repeated telemetry setup does not add multiple exporters.

…zer run_id placeholder

Two CloudEvents/Langfuse correctness fixes, independent of the trace viewer.

Langfuse exporter-mode gating: annotate_current_generation_io (and its two
executor.py call-site guards) were gated on LANGFUSE_PUBLIC_KEY instead of
langfuse_inline_enabled. In the default `exporter` mode they called
get_client().update_current_generation() with no active @observe span, logging
"No active span in current context" (~14 per dialectic run) and building
throwaway model_dump payloads on every LLM call. The LangfuseExporter projects
I/O from the captured stream, so these helpers must no-op in exporter mode.
Gated all three on langfuse_inline_enabled; added a regression test; fixed a
stale conditional_observe docstring.

Summarizer run_id placeholder: AgentToolSummaryCreatedEvent hardcoded
run_id="deriver"/iteration=0 because summarization is a single LLM call, not an
agentic run. That placeholder pollutes run_id grouping in the CloudEvents stream
(any consumer that groups by run_id sees a phantom "deriver" run). Made
run_id/iteration optional (None) and re-keyed get_resource_id on
message_id:summary_type (the real per-summary identity; run_id/iteration can no
longer identify it); bumped schema_version 2->3. Xatu ingestion stores only the
CloudEvent envelope, so the field/resource_id/version changes are transparent to it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/telemetry/test_events.py`:
- Around line 762-766: The current test only validates
sample_summary_created_event, which still includes run_id and iteration, so it
does not cover the nullable-field regression. Update the test around
AgentToolSummaryCreatedEvent and get_resource_id() to construct a non-agentic
summary event directly with run_id=None and iteration=None, then assert the
expected resource id so the contract is exercised without relying on the
fixture.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8458b827-4dea-444f-8b04-4b9b25a27ee1

📥 Commits

Reviewing files that changed from the base of the PR and between 80341c6 and 4f96442.

📒 Files selected for processing (7)
  • src/llm/executor.py
  • src/llm/runtime.py
  • src/telemetry/events/agent.py
  • src/telemetry/logging.py
  • src/utils/summarizer.py
  • tests/llm/test_langfuse_trace_annotation.py
  • tests/telemetry/test_events.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/utils/summarizer.py
  • src/telemetry/logging.py
  • src/llm/runtime.py
  • src/llm/executor.py

Comment on lines +762 to +766
"""get_resource_id() keys on message_id:summary_type (run_id/iteration are
None for the non-agentic summarizer and can't identify the summary)."""
assert (
sample_summary_created_event.get_resource_id()
== "ghi11111:1:summary_created"
== "msg_020:short:summary_created"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

Cover the actual nullable-field regression here.

This assertion still goes through sample_summary_created_event, and that fixture populates run_id/iteration. The new contract is specifically that non-agentic summarizer events omit both fields, so this test won't catch a regression that makes them required again. Please add a case that constructs AgentToolSummaryCreatedEvent(run_id=None, iteration=None, ...) directly.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/telemetry/test_events.py` around lines 762 - 766, The current test only
validates sample_summary_created_event, which still includes run_id and
iteration, so it does not cover the nullable-field regression. Update the test
around AgentToolSummaryCreatedEvent and get_resource_id() to construct a
non-agentic summary event directly with run_id=None and iteration=None, then
assert the expected resource id so the contract is exercised without relying on
the fixture.

@Rajat-Ahuja1997 Rajat-Ahuja1997 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally looks pretty good - i think the overall structure makes sense

Comment thread src/dialectic/core.py
Comment thread src/llm/capture.py
Comment thread .env.template

# Full-fidelity payload tracing (llm.call.traced / trace.content). Default-off
# and additive — leaving TRACE_PAYLOADS unset changes nothing.
# TELEMETRY_TRACE_PAYLOADS=false # Trace events ship to TELEMETRY_ENDPOINT (Xatu), on a separate buffer

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe TELEMETRY_TRACE_PAYLOADS_ENABLED?

Comment thread src/embedding_client.py
Comment on lines +135 to +136
EmbeddingCallTracedEvent(
trace_id=run_id,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make the parent_span_id here the run_id and then make span_id a new nanoid. an agent run can create several different embeddings and all would have the same run_id

Comment on lines +220 to +221
# decoupled from the CloudEvents emitter) — so it works even when
# TELEMETRY.ENABLED is off. Registering the exporter makes has_exporters()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Works even when TELEMETRY.ENABLED" is not currently true per the caller of initialize_telemetry_events so this is a little misleading

Comment on lines +63 to +66
def end_run(run_key: str) -> None:
"""Free a run's hash set. Safe to call for an unknown key."""
with _lock:
_runs.pop(run_key, None)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently unused

Comment on lines +169 to +173
@staticmethod
def _emit_trace(event: LLMCallTracedEvent | TraceContentEvent) -> None:
from src.telemetry.events import emit_trace

emit_trace(event)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't think we need this helper

)
)

def _emit_derived_content(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make _emit_hashed_content? considering the deriver is a honcho primitive, i initially thought that this only emitted content related to the deriver

Comment thread src/llm/types.py
Comment on lines +314 to +316
finish_reason = (
"stop"
if stream_error is None

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we capture the actual finish reason from the llm chunks?

Comment thread src/llm/capture.py
Comment on lines +191 to +194
role = str(message.get("role", ""))
raw_content = message.get("content")
tool_call_id = message.get("tool_call_id")
content, truncated = clip_for_trace(raw_content)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just store the full message object and truncate message.content? per claude, we're currently excluding tool calls made by openai and gemini (which don't record tool calls in the content field).

haven't tested this personally but let's look into it before shipping

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants