One line to instrument your agent and capture every event in an immutable, queryable audit trail.
Open-source SDK implementing chaukas-spec for standardized agent instrumentation
Quick Start β’ Documentation β’ Examples β’ chaukas-spec β’ Community
Building AI agents is hard. Understanding what they're doing is harder.
Chaukas SDK is an open-source SDK that implements the chaukas-spec β a standardized event schema for AI agent instrumentation. It gives you X-ray vision into your AI agents with zero configuration:
import chaukas
chaukas.enable_chaukas() # That's it. You're done.
# Your existing agent code works unchanged
agent = Agent(name="assistant", model="gpt-4")
result = await agent.run(messages=[...])Instantly get:
- π― Complete execution traces with distributed tracing
- π Automatic retry detection and tracking (CrewAI, LangChain)
- π οΈ Tool call monitoring and performance metrics
- π€ Multi-agent handoff visualization
- π¨ Error tracking with full context
- π LLM token usage and cost tracking
- π Policy enforcement and compliance logs
- π¨ Beautiful, queryable event streams
| Feature | Chaukas | Traditional APM | Manual Logging |
|---|---|---|---|
| Setup Time | 1 line | Hours | Days |
| Code Changes | Zero | Extensive | Everywhere |
| Agent-Native | β 100% | β Adapted | β Custom |
| Event Coverage | π 19/19 chaukas-spec | π€· Up to you | |
| Standardized Schema | β chaukas-spec | β Proprietary | β None |
| Multi-Agent Tracking | β Built-in | β Manual | β Complex |
| MCP Protocol | β Native | β No support | β Manual |
| Distributed Tracing | β Automatic | β Hard | |
| Type Safety | β Full | β None |
pip install chaukas-sdkSet your environment variables (or pass them programmatically):
export CHAUKAS_TENANT_ID="your-tenant"
export CHAUKAS_PROJECT_ID="your-project"
export CHAUKAS_ENDPOINT="https://api.chaukas.ai"
export CHAUKAS_API_KEY="your-api-key"import chaukas
from openai import OpenAI
from openai.agents import Agent
# Enable instrumentation
chaukas.enable_chaukas()
# Your code works exactly as before
client = OpenAI()
agent = Agent(
name="data-analyst",
instructions="You are a helpful data analyst.",
model="gpt-4o",
client=client
)
result = await agent.run(
messages=[{"role": "user", "content": "Analyze Q4 revenue"}]
)
# Chaukas automatically captures:
# β
Session start/end
# β
Agent lifecycle
# β
LLM invocations with tokens
# β
Tool calls and results
# β
Errors (18/19 event types - RETRY not supported, see below)
# β
Policy decisions
# β
State changesNote: OpenAI Agents SDK captures 18/19 event types (94.7%). RETRY events cannot be captured because the OpenAI SDK performs retries internally within its HTTP client layer, making them invisible to external instrumentation. All other frameworks (CrewAI, LangChain) support full 19/19 event coverage including RETRY detection.
import chaukas
from crewai import Agent, Task, Crew, Process
chaukas.enable_chaukas()
# Define your crew
researcher = Agent(
role="Senior Research Analyst",
goal="Uncover cutting-edge developments in AI",
backstory="You're an expert at finding insights",
verbose=True
)
task = Task(
description="Research latest AI trends",
agent=researcher,
expected_output="A comprehensive report"
)
crew = Crew(
agents=[researcher],
tasks=[task],
process=Process.sequential
)
# Full observability out of the box
result = crew.kickoff()import chaukas
from adk import Agent
chaukas.enable_chaukas()
agent = Agent(name="assistant")
response = agent.run("Hello!")import chaukas
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
chaukas.enable_chaukas()
# Your LangChain code works unchanged
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"topic": "programming"})
# Chaukas automatically captures:
# β
Session start/end
# β
Chain lifecycle
# β
LLM invocations with tokens
# β
Tool calls (if using agents)
# β
RAG operations (retriever tracking)
# β
Errors and retriesChaukas SDK implements the chaukas-spec β a standardized event schema with 19 event types for AI agent observability.
| Framework | Version | Events | Status | Notes |
|---|---|---|---|---|
| OpenAI Agents | >=0.5.0,<1.0.0 |
π 18/19 | π’ Production | Session mgmt, MCP protocol, policy tracking, state updates, errors |
| CrewAI | >=1.4.1,<2.0.0 |
π 19/19 | π’ Production | Event bus integration, multi-agent handoffs, knowledge sources, guardrails, flows |
| LangChain | >=0.1.0,<2.0 |
π 18/19 | π’ Production | Runnable method patching, chains, agents, tools, RAG, retriever tracking |
| Google ADK | Latest | π§ 5/19 | π‘ Under Construction | Basic agent & LLM tracking |
Coming Soon: LangGraph, AutoGen, Microsoft Semantic Kernel
All frameworks implementing the complete chaukas-spec capture all 19 event types
The chaukas-spec defines 19 standardized event types for AI agent observability. Chaukas SDK captures all of them automatically:
SESSION_START # User session begins
SESSION_END # Session completes
AGENT_START # Agent begins execution
AGENT_END # Agent finishes
AGENT_HANDOFF # Control transfers between agentsMODEL_INVOCATION_START # LLM call initiated
MODEL_INVOCATION_END # LLM responds (includes tokens, cost)TOOL_CALL_START # Tool execution begins
TOOL_CALL_END # Tool completes with result
MCP_CALL_START # Model Context Protocol call starts
MCP_CALL_END # MCP operation completesINPUT_RECEIVED # User input captured
OUTPUT_EMITTED # Agent output generatedERROR # Error with full context
RETRY # Automatic retry detected (rate limits, timeouts)
POLICY_DECISION # Content filtering, guardrails enforced
DATA_ACCESS # Knowledge base, file, or API access
STATE_UPDATE # Agent configuration changes
SYSTEM_EVENT # Framework initialization, shutdownEvery event includes full trace context:
{
"event_id": "019a6700-adb9-718d-0bc9-0000415845aa",
"session_id": "019a6700-adb7-7a30-a548-000077453f71",
"trace_id": "019a6700-adb7-7ef3-1e46-0000ae993c28",
"span_id": "019a6700-adb9-706a-0a26-000073699939",
"parent_span_id": "019a6700-adb7-7b27-1858-0000ee8d895b",
"type": "EVENT_TYPE_TOOL_CALL_END",
"agent_id": "data-analyst",
"timestamp": "2025-01-08T12:34:56.789Z"
}Visualize complete request flows across:
- Multiple agents
- LLM calls
- Tool invocations
- External API calls
Chaukas automatically detects and tracks retries:
# Your code
try:
result = await agent.run(messages)
except RateLimitError:
await asyncio.sleep(2) # Exponential backoff
result = await agent.run(messages) # Retry
# Chaukas captures:
# 1. ERROR event (rate limit)
# 2. RETRY event (attempt 1, exponential strategy, 2000ms delay)
# 3. MODEL_INVOCATION_START (retry attempt)
# 4. MODEL_INVOCATION_END (success)Only SDK with native MCP instrumentation:
from agents import Agent
from agents.mcp import MCPServerStreamableHttp
# MCP server setup
mcp_server = MCPServerStreamableHttp(
url="http://localhost:8000",
server_name="documentation-server"
)
agent = Agent(
name="doc-agent",
model="gpt-4o",
mcp_servers=[mcp_server]
)
# Chaukas captures:
# - MCP_CALL_START (get_prompt request)
# - MCP_CALL_END (prompt retrieved, 245ms)
# - Full request/response payloadsMonitor content filtering and guardrails:
# When OpenAI filters content
response = await agent.run(messages)
# Chaukas automatically captures:
{
"type": "EVENT_TYPE_POLICY_DECISION",
"policy_id": "openai_content_policy",
"outcome": "blocked",
"rule_ids": ["content_filter"],
"rationale": "Response blocked due to: content_filter",
"finish_reason": "content_filter"
}Track agent configuration changes:
# Agent configuration updated
agent.temperature = 0.7
agent.instructions = "Be more creative"
# Chaukas captures the diff:
{
"type": "EVENT_TYPE_STATE_UPDATE",
"state_update": {
"temperature": {"old": 0.3, "new": 0.7},
"instructions": {
"old": "Be precise",
"new": "Be more creative"
}
}
}Visualize agent collaboration:
# CrewAI agent handoff
task.context = [previous_task]
# Chaukas captures:
{
"type": "EVENT_TYPE_AGENT_HANDOFF",
"from_agent_id": "researcher",
"to_agent_id": "writer",
"handoff_reason": "task_delegation",
"context_data": {...}
}CHAUKAS_TENANT_ID # Your tenant identifier
CHAUKAS_PROJECT_ID # Your project identifier
CHAUKAS_ENDPOINT # API endpoint (api mode)
CHAUKAS_API_KEY # Authentication key (api mode)CHAUKAS_OUTPUT_MODE="api" # "api" or "file"
CHAUKAS_OUTPUT_FILE="events.jsonl" # File path (file mode)
CHAUKAS_BATCH_SIZE=20 # Events per batch
CHAUKAS_MAX_BATCH_BYTES=262144 # Max batch size (256KB)
CHAUKAS_FLUSH_INTERVAL=5.0 # Auto-flush interval (seconds)
CHAUKAS_TIMEOUT=30.0 # Request timeout (seconds)
CHAUKAS_BRANCH="main" # Git branch for context
CHAUKAS_TAGS="prod,us-east-1" # Custom tagsCREWAI_DISABLE_TELEMETRY=true # Disable CrewAI's telemetryimport chaukas
chaukas.enable_chaukas(
tenant_id="acme-corp",
project_id="production",
endpoint="https://observability.acme.com",
api_key="sk-proj-...",
session_id="custom-session-123", # Optional custom session
config={
"auto_detect": True, # Auto-detect installed SDKs
"enabled_integrations": [ # Or specify explicitly
"openai_agents",
"crewai"
],
"batch_size": 20, # Default batch size
"flush_interval": 10.0,
"timeout": 60.0,
}
)Perfect for local development and testing:
import os
os.environ["CHAUKAS_OUTPUT_MODE"] = "file"
os.environ["CHAUKAS_OUTPUT_FILE"] = "agent_events.jsonl"
import chaukas
chaukas.enable_chaukas()
# Events written to agent_events.jsonl
# Analyze with: cat agent_events.jsonl | jq .type | sort | uniq -cimport chaukas
chaukas.enable_chaukas()
# Run your agent
result = await agent.run(messages)
# Query events:
# cat events.jsonl | jq 'select(.type=="EVENT_TYPE_MODEL_INVOCATION_END") | .model_invocation.usage'
# Output:
{
"prompt_tokens": 234,
"completion_tokens": 456,
"total_tokens": 690,
"estimated_cost_usd": 0.0207
}import chaukas
from crewai import Agent, Task, Crew, Process
chaukas.enable_chaukas()
# Define a multi-agent crew
researcher = Agent(role="Researcher", goal="Find insights")
writer = Agent(role="Writer", goal="Write report")
research_task = Task(description="Research AI trends", agent=researcher)
writing_task = Task(
description="Write report",
agent=writer,
context=[research_task] # Handoff point
)
crew = Crew(agents=[researcher, writer], tasks=[research_task, writing_task])
result = crew.kickoff()
# Chaukas captures:
# 1. SESSION_START
# 2. AGENT_START (researcher)
# 3. MODEL_INVOCATION_* (researcher's LLM calls)
# 4. AGENT_END (researcher)
# 5. AGENT_HANDOFF (researcher β writer)
# 6. AGENT_START (writer)
# 7. MODEL_INVOCATION_* (writer's LLM calls)
# 8. AGENT_END (writer)
# 9. SESSION_ENDimport chaukas
from openai import OpenAI
from openai.agents import Agent
chaukas.enable_chaukas()
def search_database(query: str) -> str:
"""Search the product database."""
# Slow database query
import time
time.sleep(2)
return f"Results for: {query}"
agent = Agent(
name="support-agent",
model="gpt-4o",
tools=[search_database]
)
result = await agent.run(messages=[
{"role": "user", "content": "Find product XYZ"}
])
# Chaukas captures tool performance:
# TOOL_CALL_START β TOOL_CALL_END
# Duration: 2.1s (flag for optimization!)import chaukas
chaukas.enable_chaukas()
# Your code encounters rate limits
for i in range(100):
try:
result = await agent.run(messages)
except RateLimitError as e:
await asyncio.sleep(2 ** i) # Exponential backoff
continue
# Query retry events:
# cat events.jsonl | jq 'select(.type=="EVENT_TYPE_RETRY")'
# Output shows patterns:
# - 15 retries in last hour
# - Average backoff: 4.2s
# - All due to rate limits (429)
# β Action: Implement request throttlingβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Your Application β
β β
β βββββββββββββββ ββββββββββββββββ β
β β OpenAI β β CrewAI β β
β β Agent β β Crew β β
β ββββββββ¬βββββββ ββββββββ¬ββββββββ β
β β β β
β βββββββββββββ¬ββββββββββββ β
β β β
β βββββββββββββΌββββββββββββ β
β β Chaukas SDK β (Monkey patching) β
β β - Auto-detection β β
β β - Event capture β β
β β - Distributed trace β β
β βββββββββββββ¬ββββββββββββ β
β β β
βββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββΌββββββββββββ
β Intelligent Batching β
β - Adaptive sizing β
β - Auto-retry β
β - Memory-efficient β
βββββββββββββ¬ββββββββββββ
β
βββββββββββββΌββββββββββββ
β Transmission β
β - gRPC (API mode) β
β - File (Dev mode) β
βββββββββββββ¬ββββββββββββ
β
βββββββββββββΌββββββββββββ
β Chaukas Platform β
β - Storage β
β - Querying β
β - Visualization β
βββββββββββββββββββββββββ
Agent.run() called
β
βββ SESSION_START (first call)
β
βββ AGENT_START
β
βββ INPUT_RECEIVED (user message)
β
βββ MODEL_INVOCATION_START
β β
β βββ [LLM processes]
β
βββ MODEL_INVOCATION_END (with tokens)
β
βββ TOOL_CALL_START (if tools requested)
β β
β βββ [Tool executes]
β
βββ TOOL_CALL_END (with result)
β
βββ OUTPUT_EMITTED (agent response)
β
βββ AGENT_END
β
βββ SESSION_END (on cleanup)Session (lifetime of user interaction)
β
ββ Trace (single request/response)
β β
β ββ Agent Span (agent execution)
β β β
β β ββ LLM Span (model call)
β β β
β β ββ Tool Span (tool execution)
β β β β
β β β ββ MCP Span (MCP protocol call)
β β β
β β ββ Tool Span (another tool)
β β
β ββ Agent Span (handoff to second agent)
β β
β ββ LLM Span
β
ββ Trace (follow-up request)
ββ ...
- Track agent reliability and uptime
- Monitor LLM token costs in real-time
- Detect performance regressions
- Alert on error spikes
- Reproduce issues with full trace context
- Understand agent decision-making
- Optimize tool execution performance
- Test multi-agent workflows
- Immutable audit trail of all interactions
- Track policy enforcement decisions
- Monitor data access patterns
- Generate compliance reports
- Identify expensive LLM calls
- Track token usage by agent/model
- Find opportunities for caching
- Optimize prompt engineering
Chaukas implements intelligent batching to optimize performance:
βββββββββββββββββββββββββββββββββββββββββββ
β Event Buffer β
β β
β Events accumulate until: β
β β’ batch_size reached (default: 20) β
β β’ max_batch_bytes reached (256KB) β
β β’ flush_interval elapsed (5s) β
βββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β Send to Server β
ββββββ¬βββββββββββββ
β
Success? ββββYesβββ β
Done
β
No (503)
β
βΌ
ββββββββββββββββββββββ
β Split batch in halfβ
β Retry both halves β
ββββββββββββββββββββββ- Overhead: < 1% CPU impact
- Memory: ~10MB for 1000 events buffered
- Latency: < 5ms per event capture
- Network: Batched transmission reduces API calls by 95%
# High-volume production (optimize throughput)
chaukas.enable_chaukas(config={
"batch_size": 200,
"max_batch_bytes": 1_048_576, # 1MB
"flush_interval": 30.0
})
# Real-time debugging (optimize latency)
chaukas.enable_chaukas(config={
"batch_size": 1,
"flush_interval": 0.1
})
# Memory-constrained (optimize memory)
chaukas.enable_chaukas(config={
"batch_size": 10,
"max_batch_bytes": 65536, # 64KB
"flush_interval": 2.0
})Problem: Seeing "Transient error Service Unavailable" when using CrewAI
Cause: CrewAI's built-in telemetry trying to send data to their servers
Solution:
export CREWAI_DISABLE_TELEMETRY=trueThis only disables CrewAI's telemetry. Chaukas continues capturing events normally.
Problem: No events in output file or API
Solution:
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("chaukas.sdk").setLevel(logging.DEBUG)
# Verify configuration
import chaukas
chaukas.enable_chaukas()
print(chaukas.get_config()) # Check settings
# Force flush before exit
chaukas.flush()
chaukas.disable_chaukas()Problem: Memory consumption increasing over time
Cause: Large batches accumulating
Solution:
# Reduce batch size and increase flush frequency
chaukas.enable_chaukas(config={
"batch_size": 10,
"max_batch_bytes": 65536,
"flush_interval": 1.0
})Problem: Server returning "high memory" errors
Cause: Batches too large
Solution: SDK automatically splits batches and retries. If persistent:
chaukas.enable_chaukas(config={
"max_batch_bytes": 131072, # Reduce to 128KB
"batch_size": 50 # Smaller batch count
})- chaukas-spec - Standardized event schema (19 event types)
- Examples Repository - Complete working examples for all supported frameworks
- OpenAI Examples - OpenAI Agents integration examples and guides
- CrewAI Examples - CrewAI integration examples and guides
- LangChain Examples - LangChain integration examples and guides
- Google ADK Examples - Google ADK integration examples
git clone https://github.com/chaukasai/chaukas-sdk
cd chaukas-sdk
pip install -e ".[dev]"# All tests
pytest
# With coverage
pytest --cov=chaukas
# Specific test file
pytest tests/test_openai_events.py -v
# Watch mode
pytest-watch# Format code
black src/ tests/ examples/
# Sort imports
isort src/ tests/ examples/
# Type checking
mypy src/chaukas/
# Run all checks
make lint# OpenAI Agents example
python examples/openai/openai_comprehensive_example.py
# CrewAI example
python examples/crewai/crewai_example.py
# Analyze captured events
cat events.jsonl | jq .type | sort | uniq -cWe welcome contributions from the community! Whether you're:
- π Reporting bugs
- π‘ Requesting features
- π Improving documentation
- π§ Contributing code
- β Asking questions
Please read our Contributing Guide for detailed guidelines on:
- Setting up your development environment
- Coding standards and best practices
- Testing requirements
- Pull request process
- Fork and clone the repository
- Install dependencies:
pip install -e ".[dev]" - Make your changes following our coding standards
- Run tests:
make test && make lint - Submit a PR using our PR template
Found a bug or have a feature request? Please use our issue templates:
Please follow our Code of Conduct to keep our community welcoming and inclusive.
For security vulnerabilities, please see our Security Policy.
- GitHub Discussions - Ask questions, share ideas
- GitHub Issues - Bug reports and feature requests
- GitHub Issues - Bug reports and feature requests
- Email - Direct support
- Examples - Working code examples and guides
Apache 2.0 License - see LICENSE file for details
Built with β€οΈ by the Chaukas team
Website β’ chaukas-spec β’ GitHub