Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,16 @@ PLUGIN_PUBLISHER_ID="your-publisher-id"

# MCP
MCP_OAUTH_ISSUER_URL="https://your-mcp-oauth-issuer-url.com"
MCP_RESOURCE_SERVER_URL="https://your-mcp-resource-server-url.com"
MCP_RESOURCE_SERVER_URL="https://your-mcp-resource-server-url.com"

# Tracing
TRACE_TRACKER_ENABLED=false
TRACE_QUEUE_MAX_SIZE=10000
TRACE_SLOW_REQUEST_MS=1000
TRACE_AGENT_SLOW_OPERATION_MS=30000
TRACE_HEARTBEAT_INTERVAL_SECONDS=30
TRACE_HEALTH_INTERVAL_SECONDS=30
TRACE_RESOURCE_INTERVAL_SECONDS=30
TRACE_HEALTH_TIMEOUT_SECONDS=1
TRACE_AGENT_LOOP_ITERATIONS=20
TRACE_AGENT_TOOL_LOOP_ITERATIONS=20
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<p align="center">
<a href="https://discord.gg/VTngQTaeDf"><img src="https://img.shields.io/badge/Discord-Join%20Lumen%20Brain-5865F2?style=for-the-badge&logo=discord&logoColor=white" alt="Discord"/></a>
<img src="https://img.shields.io/badge/version-2.11.9--dev-blue?style=for-the-badge" alt="Version"/>
<img src="https://img.shields.io/badge/version-2.11.10--dev-blue?style=for-the-badge" alt="Version"/>
<img src="https://img.shields.io/badge/python-3.11+-green?style=for-the-badge&logo=python&logoColor=white" alt="Python"/>
<img src="https://img.shields.io/badge/license-AGPLv3%20%2B%20Commons%20Clause-purple?style=for-the-badge" alt="License"/>
</p>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "brainapi2"
version = "2.11.9-dev"
version = "2.11.10-dev"
description = "Version 2.x.x of the BrainAPI memory layer."
authors = [
{name = "Christian",email = "alch.infoemail@gmail.com"}
Expand Down
118 changes: 118 additions & 0 deletions src/core/agents/core/invoke_loop.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os

from src.lib.tracing import TraceSeverity, tracer
from src.utils.cleanup import strip_json

from .parsing import (
Expand Down Expand Up @@ -59,7 +62,51 @@ def _log_token_usage(response) -> None:
)


def _trace_metadata(agent, config) -> dict:
metadata = (config or {}).get("metadata") or {}
return {
"agent": metadata.get("agent") or agent.__class__.__name__,
"brain_id": metadata.get("brain_id") or (config or {}).get("brain_id"),
"tags": (config or {}).get("tags") or [],
"tools_count": len(getattr(agent, "tools", []) or []),
"has_output_schema": agent.output_schema is not None,
}


def run_invoke_loop(agent, messages, config):
trace_metadata = _trace_metadata(agent, config)
trace_tenant_id = trace_metadata.get("brain_id")
with tracer.span(
"agent.invoke_loop",
service="brainapi-agent",
operation="invoke_loop",
tenant_id=trace_tenant_id,
metadata=trace_metadata,
slow_operation_ms=float(os.getenv("TRACE_AGENT_SLOW_OPERATION_MS", "30000")),
):
return _run_invoke_loop_impl(
agent,
messages,
config,
trace_metadata=trace_metadata,
trace_tenant_id=trace_tenant_id,
)


def _run_invoke_loop_impl(
agent,
messages,
config,
*,
trace_metadata: dict,
trace_tenant_id: str | None,
):
agent_loop_threshold = int(os.getenv("TRACE_AGENT_LOOP_ITERATIONS", "20"))
tool_loop_threshold = int(os.getenv("TRACE_AGENT_TOOL_LOOP_ITERATIONS", "20"))
outer_loop_count = 0
tool_loop_count = 0
model_invoke_attempt_count = 0
schema_retry_count = 0
model_responses = []
agent.messages = []
agent.messages.append(
Expand Down Expand Up @@ -92,6 +139,16 @@ def run_invoke_loop(agent, messages, config):
structured_response = None
_schema_retry_count = 0
while True:
outer_loop_count += 1
tracer.expensive_loop(
"agent.invoke_loop.outer_loop",
outer_loop_count,
service="brainapi-agent",
operation="invoke_loop",
tenant_id=trace_tenant_id,
metadata=trace_metadata,
threshold=agent_loop_threshold,
)
_did_retry_recovered_tool_call = False
while n_message is None:
_did_retry_unknown_finish = False
Expand Down Expand Up @@ -120,9 +177,21 @@ def run_invoke_loop(agent, messages, config):
else:
_invoke_attempts = 3
for _invoke_attempt in range(_invoke_attempts):
model_invoke_attempt_count += 1
try:
_n_message = agent.model.invoke(agent.messages, config)
except Exception as _invoke_exc:
tracer.exception(
"agent.model.invoke.failed",
_invoke_exc,
service="brainapi-agent",
operation="model.invoke",
tenant_id=trace_tenant_id,
metadata={
**trace_metadata,
"attempt": _invoke_attempt + 1,
},
)
_exc_str = str(_invoke_exc).lower()
if agent._tools_bound and (
"invalid message format" in _exc_str
Expand Down Expand Up @@ -355,6 +424,19 @@ def run_invoke_loop(agent, messages, config):
_did_retry_recovered_next_tool_call = False
_last_called_tool_name = tool_name
while True:
tool_loop_count += 1
tracer.expensive_loop(
"agent.invoke_loop.tool_loop",
tool_loop_count,
service="brainapi-agent",
operation="tool_loop",
tenant_id=trace_tenant_id,
metadata={
**trace_metadata,
"last_tool_name": _last_called_tool_name,
},
threshold=tool_loop_threshold,
)
_next_attempts = 3
for _next_attempt in range(_next_attempts):
if (
Expand Down Expand Up @@ -386,6 +468,18 @@ def run_invoke_loop(agent, messages, config):
try:
next_response = agent.model.invoke(agent.messages, config)
except Exception as _next_exc:
tracer.exception(
"agent.model.next_invoke.failed",
_next_exc,
service="brainapi-agent",
operation="model.next_invoke",
tenant_id=trace_tenant_id,
metadata={
**trace_metadata,
"attempt": _next_attempt + 1,
"last_tool_name": _last_called_tool_name,
},
)
_next_exc_str = str(_next_exc).lower()
if agent._tools_bound and (
"invalid message format" in _next_exc_str
Expand Down Expand Up @@ -739,6 +833,20 @@ def run_invoke_loop(agent, messages, config):
)
break
except Exception as e:
schema_retry_count += 1
tracer.error(
"agent.structured_output.parse_failed",
service="brainapi-agent",
operation="structured_output.parse",
tenant_id=trace_tenant_id,
severity=TraceSeverity.WARNING,
message=str(e),
metadata={
**trace_metadata,
"schema_retry_count": schema_retry_count,
"error_type": type(e).__name__,
},
)
if agent.debug:
print("[DEBUG (agent_base)]: ", type(e).__name__, e)
print(
Expand All @@ -761,6 +869,16 @@ def run_invoke_loop(agent, messages, config):
n_message = None
continue

tracer.expensive_loop(
"agent.model.invoke_attempts",
model_invoke_attempt_count,
service="brainapi-agent",
operation="model.invoke",
tenant_id=trace_tenant_id,
metadata=trace_metadata,
threshold=int(os.getenv("TRACE_AGENT_MODEL_INVOKE_ATTEMPTS", "10")),
)

try:
from langchain_core.tracers.langchain import wait_for_all_tracers

Expand Down
28 changes: 28 additions & 0 deletions src/lib/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from src.lib.tracing.events import (
TraceEvent,
TraceEventType,
TraceSeverity,
)
from src.lib.tracing.runtime import (
HealthProbe,
RuntimeMonitor,
start_runtime_monitoring,
stop_runtime_monitoring,
)
from src.lib.tracing.subscribers import TraceSubscriber
from src.lib.tracing.tracker import LocalTraceQueue, TraceTracker, trace_tracker, tracer

__all__ = [
"HealthProbe",
"LocalTraceQueue",
"RuntimeMonitor",
"TraceEvent",
"TraceEventType",
"TraceSeverity",
"TraceSubscriber",
"TraceTracker",
"start_runtime_monitoring",
"stop_runtime_monitoring",
"trace_tracker",
"tracer",
]
62 changes: 62 additions & 0 deletions src/lib/tracing/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dataclasses import dataclass, field
from enum import Enum
import time
from typing import Any


class TraceEventType(str, Enum):
ERROR = "error"
EXCEPTION = "exception"
DOWNTIME = "downtime"
EXPENSIVE_LOOP = "expensive_loop"
SLA_BREACH = "sla_breach"
LATENCY = "latency"
HEARTBEAT = "heartbeat"
HEALTH_CHECK = "health_check"
RESOURCE_SAMPLE = "resource_sample"
PROCESS = "process"


class TraceSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"


@dataclass(frozen=True)
class TraceEvent:
event_type: TraceEventType
name: str
severity: TraceSeverity = TraceSeverity.INFO
service: str | None = None
operation: str | None = None
tenant_id: str | None = None
trace_id: str | None = None
duration_ms: float | None = None
threshold_ms: float | None = None
status_code: int | None = None
error_type: str | None = None
message: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
stack_trace: str | None = None
created_at: float = field(default_factory=time.time)

def to_dict(self) -> dict[str, Any]:
return {
"event_type": self.event_type.value,
"name": self.name,
"severity": self.severity.value,
"service": self.service,
"operation": self.operation,
"tenant_id": self.tenant_id,
"trace_id": self.trace_id,
"duration_ms": self.duration_ms,
"threshold_ms": self.threshold_ms,
"status_code": self.status_code,
"error_type": self.error_type,
"message": self.message,
"metadata": dict(self.metadata),
"stack_trace": self.stack_trace,
"created_at": self.created_at,
}
Loading