Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
LANGFUSE_SECRET_KEY=your-langfuse-secret-key
LANGFUSE_HOST=https://cloud.langfuse.com
# Comma-separated list of trace attributes to set as Langfuse tags
# Available values: ticket_key, ticket_type, project_id, workflow_step, repo, pr_number, ci_status, event_source, event_type, llm_model
LANGFUSE_TRACE_TAGS=ticket_key,ticket_type,project_id,workflow_step,repo,pr_number,ci_status,event_source,event_type,llm_model
# Comma-separated list of trace attributes to set as Langfuse metadata
# Available values: ticket_key, ticket_type, project_id, workflow_step, repo, pr_number, ci_status, event_source, event_type, retry_count, system_prompt_length, llm_model
LANGFUSE_TRACE_METADATA=ticket_key,ticket_type,project_id,workflow_step,repo,pr_number,ci_status,event_source,event_type,retry_count,system_prompt_length,llm_model

# OpenTelemetry distributed tracing (separate from Langfuse LLM tracing above)
# Enable/disable OTLP trace export
Expand Down
44 changes: 42 additions & 2 deletions src/forge/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Configuration management using Pydantic settings."""

from functools import lru_cache
from typing import Literal
import logging
from functools import cached_property, lru_cache
from typing import TYPE_CHECKING, Literal

from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict

if TYPE_CHECKING:
from forge.integrations.langfuse.fields import TracingField

logger = logging.getLogger(__name__)


class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
Expand Down Expand Up @@ -188,6 +194,14 @@ def detect_model_provider(model_name: str) -> str:
langfuse_host: str = Field(
default="https://cloud.langfuse.com", description="Langfuse host URL"
)
langfuse_trace_tags: str = Field(
default="",
description="Comma-separated list of TracingField names to include as Langfuse trace tags",
)
langfuse_trace_metadata: str = Field(
default="",
description="Comma-separated list of TracingField names to include as Langfuse trace metadata",
)

# Claude Agent SDK Configuration
agent_enable_tools: bool = Field(
Expand Down Expand Up @@ -330,6 +344,32 @@ def langfuse_enabled(self) -> bool:
and self.langfuse_secret_key.get_secret_value()
)

@cached_property
def trace_tag_fields(self) -> list["TracingField"]:
"""Parse and validate configured Langfuse trace tag fields."""
from forge.integrations.langfuse.fields import parse_trace_fields

fields = parse_trace_fields(self.langfuse_trace_tags, allow_tags=True)
if fields:
logger.info(
"Langfuse trace tags configured: %s",
", ".join(f.value for f in fields),
)
return fields

@cached_property
def trace_metadata_fields(self) -> list["TracingField"]:
"""Parse and validate configured Langfuse trace metadata fields."""
from forge.integrations.langfuse.fields import parse_trace_fields

fields = parse_trace_fields(self.langfuse_trace_metadata, allow_tags=False)
if fields:
logger.info(
"Langfuse trace metadata configured: %s",
", ".join(f.value for f in fields),
)
return fields

@property
def use_vertex_ai(self) -> bool:
"""Check if using Vertex AI instead of direct Anthropic API."""
Expand Down
81 changes: 61 additions & 20 deletions src/forge/integrations/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from forge.config import Settings, get_settings
from forge.integrations.langfuse import get_langfuse_config, get_langfuse_context
from forge.integrations.langfuse.fields import resolve_trace_fields
from forge.prompts import load_prompt, set_default_version
from forge.skills.resolver import resolve_skill_paths

Expand All @@ -57,6 +58,29 @@

logger = logging.getLogger(__name__)

_TRACE_FIELD_KEYS = frozenset(
{
"ticket_key",
"ticket_type",
"current_node",
"current_repo",
"current_pr_number",
"ci_status",
"event_type",
"event_source",
"retry_count",
"repo",
"pr_number",
}
)


def _forward_trace_fields(context: dict[str, Any] | None) -> dict[str, Any]:
"""Extract trace-relevant fields from an incoming context dict."""
if not context:
return {}
return {k: v for k, v in context.items() if k in _TRACE_FIELD_KEYS}


def get_weather(city: str) -> str:
"""Placeholder tool for agent testing."""
Expand Down Expand Up @@ -531,6 +555,8 @@ async def _run_agent(
session_id: str | None = None,
trace_name: str | None = None,
ticket_key: str | None = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> str:
"""Run the agent with the given prompt.

Expand All @@ -543,6 +569,8 @@ async def _run_agent(
session_id: Optional session ID for Langfuse (e.g., ticket key).
trace_name: Optional trace name for Langfuse.
ticket_key: Optional ticket key for per-project skill resolution.
tags: Optional list of trace tags for Langfuse.
metadata: Optional metadata dict for Langfuse.

Returns:
Agent response text.
Expand All @@ -564,7 +592,8 @@ async def _run_agent(
langfuse_config = get_langfuse_config(
trace_name=trace_name or "deep_agent_invocation",
session_id=session_id,
metadata={"system_prompt_length": str(len(system_prompt))},
tags=tags,
metadata=metadata,
)
if langfuse_config:
# Extract context params and remove from config
Expand All @@ -579,6 +608,7 @@ async def _run_agent(
session_id=langfuse_ctx_params.get("session_id"),
user_id=langfuse_ctx_params.get("user_id"),
tags=langfuse_ctx_params.get("tags"),
metadata=langfuse_ctx_params.get("metadata"),
):
last_error: Exception | None = None
for attempt in range(self.MAX_RETRIES):
Expand Down Expand Up @@ -688,13 +718,22 @@ async def run_task(
logger.info(f"Running task '{task}' using Deep Agents")
record_agent_invocation(task_type=task)
_start = time.monotonic()
trace_state: dict[str, Any] = {
**(context or {}),
"system_prompt_length": len(system_prompt),
"llm_model": self.settings.claude_model,
}
trace_tags, trace_metadata = resolve_trace_fields(trace_state)

result = await self._run_agent(
prompt=prompt,
system_prompt=system_prompt,
include_tools=include_tools,
session_id=ticket_key,
trace_name=f"task:{task}",
ticket_key=ticket_key,
tags=trace_tags or None,
metadata=trace_metadata or None,
)
observe_agent_duration(task_type=task, duration=time.monotonic() - _start)

Expand Down Expand Up @@ -849,13 +888,12 @@ async def generate_prd(
)

logger.info("Generating PRD using Deep Agents with skill")
task_context = _forward_trace_fields(context)
task_context["project_key"] = context.get("project_key", "") if context else ""
result = await self.run_task(
task="generate-prd",
prompt=prompt,
context={
"ticket_key": context.get("ticket_key", "") if context else "",
"project_key": context.get("project_key", "") if context else "",
},
context=task_context,
)

result = self._strip_preamble(result)
Expand Down Expand Up @@ -885,13 +923,12 @@ async def generate_spec(
)

logger.info("Generating Spec using Deep Agents with skill")
task_context = _forward_trace_fields(context)
task_context["project_key"] = context.get("project_key", "") if context else ""
result = await self.run_task(
task="generate-spec",
prompt=prompt,
context={
"ticket_key": context.get("ticket_key", "") if context else "",
"project_key": context.get("project_key", "") if context else "",
},
context=task_context,
)

result = self._strip_preamble(result)
Expand Down Expand Up @@ -938,15 +975,14 @@ async def generate_epics(
)

logger.info("Generating Epics using Deep Agents with skill")
task_context = _forward_trace_fields(context)
task_context["project_key"] = context.get("project_key", "") if context else ""
task_context["feature_summary"] = context.get("feature_summary", "") if context else ""
task_context["available_repos"] = available_repos
result = await self.run_task(
task="decompose-epics",
prompt=prompt,
context={
"ticket_key": context.get("ticket_key", "") if context else "",
"project_key": context.get("project_key", "") if context else "",
"feature_summary": context.get("feature_summary", "") if context else "",
"available_repos": available_repos,
},
context=task_context,
)

epics = self._parse_epics_response(result)
Expand All @@ -959,13 +995,16 @@ async def regenerate_with_feedback(
feedback: str,
content_type: str,
ticket_key: str | None = None,
context: dict[str, Any] | None = None,
) -> str:
"""Regenerate content incorporating feedback.

Args:
original_content: The original generated content.
feedback: User feedback/revision request.
content_type: Type of content (prd, spec, epic).
ticket_key: Optional ticket key for session tracking.
context: Optional context with trace fields from workflow state.

Returns:
Regenerated content.
Expand All @@ -986,10 +1025,13 @@ async def regenerate_with_feedback(
)

logger.info(f"Regenerating {content_type} with feedback using Deep Agents")
task_context = _forward_trace_fields(context)
task_context["is_revision"] = True
task_context["ticket_key"] = ticket_key or ""
result = await self.run_task(
task=skill_name,
prompt=prompt,
context={"is_revision": True, "ticket_key": ticket_key or ""},
context=task_context,
)

result = self._strip_preamble(result)
Expand Down Expand Up @@ -1074,13 +1116,12 @@ async def answer_question(
)

logger.info(f"Answering question about {artifact_type}")
task_context = _forward_trace_fields(context)
task_context["artifact_type"] = artifact_type
result = await self.run_task(
task="answer-question",
prompt=prompt,
context={
"artifact_type": artifact_type,
"ticket_key": context.get("ticket_key", ""),
},
context=task_context,
# Q&A gets read-only MCP tools for lookups (filtered by agent_mcp_read_only)
)

Expand Down
6 changes: 6 additions & 0 deletions src/forge/integrations/langfuse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Langfuse integration for LLM observability."""

from forge.integrations.langfuse.fields import (
TracingField,
resolve_trace_fields,
)
from forge.integrations.langfuse.tracing import (
get_langfuse_config,
get_langfuse_context,
Expand All @@ -9,9 +13,11 @@
)

__all__ = [
"TracingField",
"get_langfuse_config",
"get_langfuse_context",
"get_langfuse_handler",
"resolve_trace_fields",
"shutdown_langfuse",
"trace_llm_call",
]
Loading
Loading