Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ LLM_OPENAI_API_KEY=your-api-key-here
# TELEMETRY_MAX_BUFFER_SIZE=10000
# TELEMETRY_NAMESPACE=honcho # Inherits from NAMESPACE if not set

# Full-fidelity payload tracing (llm.call.traced / trace.content). Default-off
# and additive — leaving TRACE_PAYLOADS unset changes nothing.
# TELEMETRY_TRACE_PAYLOADS=false # Trace events ship to TELEMETRY_ENDPOINT (Xatu), on a separate buffer
Comment thread
akattelu marked this conversation as resolved.
Outdated
# TELEMETRY_TRACE_MAX_BYTES=262144 # Per-message cap; oversized content is clipped
# TELEMETRY_TRACE_PURPOSES=[] # JSON list of CallPurpose values to capture; empty = all

# =============================================================================
# Cache
# =============================================================================
Expand Down
50 changes: 50 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,21 @@ class TelemetrySettings(HonchoSettings):
# that join high-volume events to aggregate envelopes first.
HIGH_VOLUME_SAMPLE_RATE: Annotated[float, Field(default=1.0, ge=0.0, le=1.0)] = 1.0

# --- Full-fidelity payload tracing (llm.call.traced / trace.content) ---
# Master toggle for replay-grade content capture. Default-off: with this
# False there is ZERO behavior change — no CapturedLLMCall is built, no
# second emitter starts, the metrics events above are untouched.
TRACE_PAYLOADS: bool = False

# Per-message cap (bytes) for captured content; oversized string content is
# clipped (with a marker) and the call is flagged was_truncated.
TRACE_MAX_BYTES: Annotated[int, Field(default=262144, gt=0)] = 262144

# Allowlist of CallPurpose values to capture; empty = all. Typed as str to
# keep the enum out of config (validated against CallPurpose at the producer,
# same pattern as LLMTelemetryContext.call_purpose).
TRACE_PURPOSES: list[str] = Field(default_factory=list)


class CacheSettings(HonchoSettings):
model_config = SettingsConfigDict(env_prefix="CACHE_", extra="ignore") # pyright: ignore
Expand Down Expand Up @@ -1345,6 +1360,17 @@ def _require_api_key_for_turbopuffer(self) -> "VectorStoreSettings":
return self


class TraceViewerSettings(HonchoSettings):
model_config = SettingsConfigDict(env_prefix="TRACE_VIEWER_", extra="ignore") # pyright: ignore

ENABLED: bool = False
HOST: str = "127.0.0.1"
PORT: int = 8002
STORAGE_DIR: str = "./traces"
MAX_REQUEST_BYTES: int = 10 * 1024 * 1024 # 10 MB
VENDOR_CDN_BASE: str = "https://cdn.jsdelivr.net/npm"


class AppSettings(HonchoSettings):
# No env_prefix for app-level settings
model_config = SettingsConfigDict( # pyright: ignore
Expand All @@ -1364,6 +1390,29 @@ class AppSettings(HonchoSettings):
EMBED_MESSAGES: bool = True
LANGFUSE_HOST: str | None = None
LANGFUSE_PUBLIC_KEY: str | None = None
# How Langfuse traces are produced:
# "exporter" (default) — Langfuse is a projection over the captured
# CapturedLLMCall stream (LangfuseExporter), the same source of truth as
# the CloudEvents trace stream.
# "inline" — legacy live instrumentation (@observe + propagate_attributes
# spans during execution). Kept one release for side-by-side validation.
LANGFUSE_EXPORTER_MODE: Literal["inline", "exporter"] = "exporter"

@property
def langfuse_inline_enabled(self) -> bool:
"""True when the legacy inline Langfuse instrumentation is active
(keys configured + ``LANGFUSE_EXPORTER_MODE == "inline"``)."""
return (
bool(self.LANGFUSE_PUBLIC_KEY) and self.LANGFUSE_EXPORTER_MODE == "inline"
)

@property
def langfuse_exporter_enabled(self) -> bool:
"""True when the Langfuse exporter (a projection over the captured call
stream) is active (keys configured + ``LANGFUSE_EXPORTER_MODE == "exporter"``)."""
return (
bool(self.LANGFUSE_PUBLIC_KEY) and self.LANGFUSE_EXPORTER_MODE == "exporter"
)

# Origins allowed by the FastAPI CORSMiddleware
CORS_ORIGINS: list[str] = [
Expand Down Expand Up @@ -1394,6 +1443,7 @@ class AppSettings(HonchoSettings):
CACHE: CacheSettings = Field(default_factory=CacheSettings)
DREAM: DreamSettings = Field(default_factory=DreamSettings)
VECTOR_STORE: VectorStoreSettings = Field(default_factory=VectorStoreSettings)
TRACE_VIEWER: TraceViewerSettings = Field(default_factory=TraceViewerSettings)

@field_validator("LOG_LEVEL")
def validate_log_level(cls, v: str) -> str:
Expand Down
11 changes: 10 additions & 1 deletion src/deriver/deriver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import time

from nanoid import generate as generate_nanoid

from src import crud
from src.config import ConfiguredModelSettings, settings
from src.crud.representation import RepresentationManager
Expand Down Expand Up @@ -141,7 +143,12 @@ async def process_representation_tasks_batch(
max_tokens = base_model_config.max_output_tokens or settings.LLM.DEFAULT_MAX_TOKENS
model_config = base_model_config

# Single LLM call
# Single LLM call. Mint a root span id so the deriver's single-shot call is
# reconstructable from the trace stream. run_id stays None (the deriver is an
# intentionally sessionless single-shot — leaving run_id unset keeps existing
# LLMCallCompletedEvent resource ids unchanged); trace_id/span_id are new and
# separate.
trace_id = generate_nanoid()
llm_start = time.perf_counter()
response = await honcho_llm_call(
model_config=model_config,
Expand All @@ -159,6 +166,8 @@ async def process_representation_tasks_batch(
parent_category="representation",
observed=observed,
track_name="Minimal Deriver",
trace_id=trace_id,
span_id=trace_id,
),
)
llm_duration = (time.perf_counter() - llm_start) * 1000
Expand Down
8 changes: 8 additions & 0 deletions src/dialectic/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ async def agentic_chat(
session = await crud.get_session(
db, workspace_name=workspace_name, session_name=session_name
)
# Read the opaque Session.id while the instance is still bound; the ORM
# object detaches once this read-only session closes below.
session_id = session.id if session else None
workspace = await crud.get_workspace(db, workspace_name=workspace_name)
configuration = get_configuration(None, session, workspace)

Expand All @@ -68,6 +71,7 @@ async def agentic_chat(
agent = DialecticAgent(
workspace_name=workspace_name,
session_name=session_name,
session_id=session_id,
observer=observer,
observed=observed,
observer_peer_card=observer_peer_card,
Expand Down Expand Up @@ -111,6 +115,9 @@ async def agentic_chat_stream(
session = await crud.get_session(
db, workspace_name=workspace_name, session_name=session_name
)
# Read the opaque Session.id while the instance is still bound; the ORM
# object detaches once this read-only session closes below.
session_id = session.id if session else None
workspace = await crud.get_workspace(db, workspace_name=workspace_name)
configuration = get_configuration(None, session, workspace)

Expand All @@ -129,6 +136,7 @@ async def agentic_chat_stream(
agent = DialecticAgent(
workspace_name=workspace_name,
session_name=session_name,
session_id=session_id,
observer=observer,
observed=observed,
observer_peer_card=observer_peer_card,
Expand Down
13 changes: 13 additions & 0 deletions src/dialectic/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
observed_peer_card: list[str] | None = None,
metric_key: str | None = None,
reasoning_level: ReasoningLevel = "low",
session_id: str | None = None,
):
"""
Initialize the dialectic agent.
Expand All @@ -81,9 +82,12 @@ def __init__(
observed_peer_card: Biographical information about the observed peer
metric_key: Optional key for logging metrics (if provided, agent won't log separately)
reasoning_level: Level of reasoning to apply
session_id: Opaque Session.id (nanoid PK, NOT session_name) used as the
trace session grouping key. None for global queries.
Comment thread
akattelu marked this conversation as resolved.
Outdated
"""
self.workspace_name: str = workspace_name
self.session_name: str | None = session_name
self.session_id: str | None = session_id
self.observer: str = observer
self.observed: str = observed
self.observer_peer_card: list[str] | None = observer_peer_card
Expand Down Expand Up @@ -179,6 +183,7 @@ async def _prefetch_relevant_observations(self, query: str) -> str | None:
workspace_name=self.workspace_name,
run_id=self._run_id,
parent_category="dialectic",
session_id=self.session_id,
):
query_embedding = await embedding_client.embed(query)

Expand Down Expand Up @@ -316,6 +321,14 @@ def _telemetry_context(self, track_name: str | None = None) -> LLMTelemetryConte
parent_category="dialectic",
agent_type="dialectic",
run_id=self._run_id,
# Root span of this dialectic invocation. Reuse the already-minted
# run_id so trace_id == span_id == run_id — the run-keyed CloudEvents
# stay byte-for-byte unchanged.
trace_id=self._run_id,
span_id=self._run_id,
# Honcho Session.id (conversation grouping). None for global queries.
# Single-turn today; future-proofs multi-turn conversation threading.
session_id=self.session_id,
peer_name=self.observed,
track_name=track_name,
)
Expand Down
9 changes: 9 additions & 0 deletions src/dreamer/specialists.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ async def run(
SpecialistResult with metrics and content
"""
run_id = parent_run_id or generate_nanoid()
# Specialists sharing the orchestrator's run_id (one dream trace) each get a
# distinct span_id so their CloudEvents trace resource ids don't collide;
# trace_id stays run_id so Langfuse still groups them (keyed by agent_type).
span_id = generate_nanoid() if parent_run_id is not None else run_id
task_name = f"dreamer_{self.name}_{run_id}"
start_time = time.perf_counter()

Expand Down Expand Up @@ -292,6 +296,11 @@ async def run(
parent_category="dream",
agent_type=self.name,
run_id=run_id,
# Root span per specialist run (distinct span_id, see above).
# parent_span_id stays None for now; wiring specialists as
# children of a dream-level trace is forking (out of scope).
trace_id=run_id,
span_id=span_id,
observer=observer,
observed=observed,
track_name=f"Dreamer/{self.name}",
Expand Down
24 changes: 24 additions & 0 deletions src/embedding_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _publish_embedding_event(
get_embedding_call_purpose,
get_embedding_parent_category,
get_embedding_run_id,
get_embedding_session_id,
get_embedding_workspace_name,
)

Expand Down Expand Up @@ -121,6 +122,29 @@ def _publish_embedding_event(
run_id=get_embedding_run_id(),
)
)

# Trace stream (ground-truth) — gated on payload tracing. Joins the
# embedding to the driving agent run: dialectic sets run_id == trace_id
# == span_id, so an embedding made in its prefetch nests under that
# run's trace and carries its session.
if settings.TELEMETRY.TRACE_PAYLOADS:
from src.telemetry.events import EmbeddingCallTracedEvent, emit_trace

run_id = get_embedding_run_id()
emit_trace(
EmbeddingCallTracedEvent(
trace_id=run_id,
Comment thread
akattelu marked this conversation as resolved.
Outdated
span_id=run_id,
session_id=get_embedding_session_id(),
call_purpose=purpose_slug,
parent_category=get_embedding_parent_category(),
provider=provider,
model=model,
provider_input_tokens=input_tokens_estimate,
provider_output_tokens=0,
input_count=input_count,
)
)
except Exception: # pragma: no cover - telemetry must not raise
logger.debug("Failed to emit EmbeddingCallCompletedEvent", exc_info=True)

Expand Down
Loading
Loading