diff --git a/README.md b/README.md index 377fb37..367defb 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ python -m pipeline \ --tasks file_property/size_classification ``` -Results are saved to `./results/{exp_name}/{mcp}__{model}/{task}` (in this example `./results/test-run/filesystem__gpt-5/file_property__size_classification`). +Results are saved to `./results/{exp_name}/{model}__{mcp}/run-*/...` (e.g., `./results/test-run/gpt-5__filesystem/run-1/...`). --- @@ -152,7 +152,7 @@ You can also follow `docs/quickstart.md` for the shortest end-to-end path. ## Results and metrics -- Results are written to `./results/` (JSON + CSV). +- Results are organized under `./results/{exp_name}/{model}__{mcp}/run-*/` (JSON + CSV per task). - Generate a summary with: ```bash python -m src.aggregators.aggregate_results --exp-name exp @@ -162,7 +162,9 @@ python -m src.aggregators.aggregate_results --exp-name exp --- ## Model and Tasks -- See `docs/introduction.md` for models supported in MCPMark. +- **Model support**: MCPMark calls models via LiteLLM — see the LiteLLM docs: [`LiteLLM Doc`](https://docs.litellm.ai/docs/). For Anthropic (Claude) extended thinking mode (enabled via `--reasoning-effort`), we use Anthropic’s native API. +- See `docs/introduction.md` for details and configuration of supported models in MCPMark. +- To add a new model, edit `src/model_config.py`. Before adding, check LiteLLM supported models/providers. See [`LiteLLM Doc`](https://docs.litellm.ai/docs/). - Task design principles in `docs/datasets/task.md`. Each task ships with an automated `verify.py` for objective, reproducible evaluation, see `docs/task.md` for details. --- diff --git a/pipeline.py b/pipeline.py index 4b58fad..b6f442e 100644 --- a/pipeline.py +++ b/pipeline.py @@ -63,10 +63,10 @@ def main(): "--timeout", type=int, default=3600, help="Timeout in seconds for agent execution" ) parser.add_argument( - "--stream", - action="store_true", - default=False, - help="Use streaming execution (default: False, uses non-streaming)", + "--reasoning-effort", + default="default", + choices=["default", "minimal", "low", "medium", "high"], + help="Reasoning effort level for supported models (default: None)", ) # Output configuration @@ -113,12 +113,11 @@ def main(): logger.info(f"Starting Run {run_idx}/{args.k}") logger.info(f"{'=' * 80}\n") - # For k-runs, create run-N subdirectory + # For k-runs, results/{exp}/{mcp}__{model}/run-N run_exp_name = f"run-{run_idx}" run_output_dir = args.output_dir / args.exp_name else: - # For single run (k=1), maintain backward compatibility - # Use run-1 subdirectory for consistency + # For single run, still use run-1 under service_model run_exp_name = "run-1" run_output_dir = args.output_dir / args.exp_name @@ -138,7 +137,7 @@ def main(): timeout=args.timeout, exp_name=run_exp_name, output_dir=run_output_dir, - stream=args.stream, + reasoning_effort=args.reasoning_effort, ) pipeline.run_evaluation(args.tasks) diff --git a/pyproject.toml b/pyproject.toml index b8bbfa9..75665cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,8 +14,13 @@ dependencies = [ "python-dotenv>=1.1.1,<2", "ruff>=0.12.4,<0.13", "psycopg2-binary>=2.9.10,<3", - "pyyaml>=6.0.2,<7" -, "nest-asyncio>=1.6.0,<2", "pixi", "pipx>=1.7.1,<2", "pgdumplib>=3.1.0,<4"] + "pyyaml>=6.0.2,<7", + "nest-asyncio>=1.6.0,<2", + "pixi", + "pipx>=1.7.1,<2", + "pgdumplib>=3.1.0,<4", + "litellm==1.76.0" +] [build-system] build-backend = "hatchling.build" diff --git a/requirements.txt b/requirements.txt index cb2cb49..64a6aa0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ matplotlib>=3.7.0 numpy>=1.23.0 psycopg2 pyyaml -nest_asyncio \ No newline at end of file +nest_asyncio +litellm==1.76.0 \ No newline at end of file diff --git a/src/agent.py b/src/agent.py deleted file mode 100644 index 0a1d6cf..0000000 --- a/src/agent.py +++ /dev/null @@ -1,1088 +0,0 @@ -""" -Unified Agent Implementation for MCPMark -========================================= - -This module provides a unified agent implementation that handles LLM and MCP server -management. The agent is responsible for: -- Model provider creation and management -- MCP server creation for different services -- LLM inference execution with streaming response -- Token usage tracking and statistics - -The agent does NOT handle task-specific logic - that's the responsibility of task managers. -""" - -# Python stdlib -import asyncio -import json -import os -import time -from typing import Any, Dict, Callable - -# Third-party dependencies -from agents import ( - Agent, - Model, - ModelProvider, - ModelSettings, - OpenAIChatCompletionsModel, - RunConfig, - Runner, - set_tracing_disabled, -) -from agents.exceptions import AgentsException -from agents.items import ItemHelpers -from openai import AsyncOpenAI -from openai.types.responses import ResponseTextDeltaEvent - -# MCP server classes (stdio & HTTP) from agents SDK -from agents.mcp.server import ( - MCPServerStdio, - MCPServerStreamableHttp, - MCPServerStreamableHttpParams, -) - -from src.logger import get_logger - -# Initialize logger -logger = get_logger(__name__) - - -def _apply_nest_asyncio(): - """Apply nest_asyncio to allow nested event loops.""" - import nest_asyncio - - nest_asyncio.apply() - - -# Apply nested asyncio support -_apply_nest_asyncio() - - -class MCPAgent: - """ - Unified agent for LLM and MCP server management. - - This agent handles the integration of: - - Model: LLM configuration (model name, API key, base URL) - - Agent Framework: Currently supports OpenAI Agents SDK - - Service: MCP service type (notion, github, postgres) - """ - - # Constants - MAX_TURNS = 100 - DEFAULT_TIMEOUT = 600 - DEFAULT_MAX_RETRIES = 3 - DEFAULT_RETRY_BACKOFF = 5.0 - - # Service categories - NPX_BASED_SERVICES = ["notion", "filesystem", "playwright", "playwright_webarena"] - PIPX_BASED_SERVICES = ["postgres"] - - def __init__( - self, - model_name: str, - api_key: str, - base_url: str, - mcp_service: str, - agent_framework: str = "openai_agents", - timeout: int = DEFAULT_TIMEOUT, - max_retries: int = DEFAULT_MAX_RETRIES, - retry_backoff: float = DEFAULT_RETRY_BACKOFF, - service_config: dict | None = None, - service_config_provider: "Callable[[], dict] | None" = None, - stream: bool = False, - ): - """ - Initialize the MCP agent. - - Args: - model_name: Name of the LLM model to use - api_key: API key for the model provider - base_url: Base URL for the model provider - mcp_service: MCP service type (notion, github, postgres) - agent_framework: Agent framework to use (default: openai_agents) - timeout: Execution timeout in seconds - max_retries: Maximum number of retries for transient errors - retry_backoff: Backoff time for retries in seconds - stream: Whether to use streaming execution (default: False) - """ - self.model_name = model_name - self.api_key = api_key - self.base_url = base_url - self.mcp_service = mcp_service - self.agent_framework = agent_framework - self.timeout = timeout - self.max_retries = max_retries - self.retry_backoff = retry_backoff - self.stream = stream - # Persisted service-specific configuration (e.g., notion_key, browser, test_directory) - self.service_config: dict[str, Any] = service_config or {} - # Store optional provider for dynamic config refresh - self._service_config_provider = service_config_provider - - # Initialize model provider - self.model_provider = self._create_model_provider() - - # Usage statistics - self._usage_stats = { - "total_input_tokens": 0, - "total_output_tokens": 0, - "total_tokens": 0, - "total_turns": 0, - "total_execution_time": 0.0, - "successful_executions": 0, - "failed_executions": 0, - } - - logger.debug( - f"Initialized MCPAgent for mcp service '{mcp_service}' with model '{model_name}'" - ) - # Disable tracing to avoid warnings and unnecessary uploads - set_tracing_disabled(True) - - @property - def _default_headers(self) -> dict: - """Get default headers for the model provider.""" - return { - "App-Code": "MCPMark", - "HTTP-Referer": "https://mcpmark.ai", - "X-Title": "MCPMark", - } - - def _create_model_provider(self) -> ModelProvider: - """Create and return a model provider for the specified model.""" - client = AsyncOpenAI( - base_url=self.base_url, - api_key=self.api_key, - default_headers=self._default_headers, - ) - agent_model_name = self.model_name # Capture the model name from the agent - - class CustomModelProvider(ModelProvider): - def get_model(self, model_name_override: str | None) -> Model: - final_model_name = model_name_override or agent_model_name - return OpenAIChatCompletionsModel( - model=final_model_name, openai_client=client - ) - - return CustomModelProvider() - - def _refresh_service_config(self) -> None: - """Refresh self.service_config from the provider, if one was supplied.""" - if self._service_config_provider is None: - return - try: - latest_cfg = self._service_config_provider() or {} - # New values override existing ones - self.service_config.update(latest_cfg) - except Exception as exc: - logger.warning("Failed to refresh service config via provider: %s", exc) - - # ========== Helper Methods ========== - - def _extract_token_usage(self, raw_responses) -> Dict[str, int]: - """Extract token usage from raw responses.""" - if not raw_responses: - return {} - - total_input = 0 - total_output = 0 - total = 0 - - for response in raw_responses: - if hasattr(response, "usage") and response.usage: - total_input += response.usage.input_tokens or 0 - total_output += response.usage.output_tokens or 0 - total += response.usage.total_tokens or 0 - - return { - "input_tokens": total_input, - "output_tokens": total_output, - "total_tokens": total, - } - - def _update_statistics(self, success: bool, token_usage: dict, turn_count: int, execution_time: float): - """Update usage statistics in a single place.""" - if success: - self._usage_stats["successful_executions"] += 1 - else: - self._usage_stats["failed_executions"] += 1 - - self._usage_stats["total_input_tokens"] += token_usage.get("input_tokens", 0) - self._usage_stats["total_output_tokens"] += token_usage.get("output_tokens", 0) - self._usage_stats["total_tokens"] += token_usage.get("total_tokens", 0) - self._usage_stats["total_turns"] += turn_count - self._usage_stats["total_execution_time"] += execution_time - - def _create_error_response(self, execution_time: float, error: str, - output: list = None, token_usage: dict = None, - turn_count: int = 0) -> Dict[str, Any]: - """Create standardized error response.""" - return { - "success": False, - "output": output or [], - "token_usage": token_usage or {}, - "turn_count": turn_count, - "execution_time": execution_time, - "error": error, - } - - def _create_success_response(self, output: list, token_usage: dict, - turn_count: int, execution_time: float) -> Dict[str, Any]: - """Create standardized success response.""" - return { - "success": True, - "output": output, - "token_usage": token_usage, - "turn_count": turn_count, - "execution_time": execution_time, - "error": None, - } - - async def _prepare_agent_execution(self, instruction: str): - """Common setup for both streaming and non-streaming execution.""" - self._refresh_service_config() - server = await self._create_mcp_server() - - agent = Agent( - name=f"{self.mcp_service.title()} Agent", - mcp_servers=[server] - ) - ModelSettings.tool_choice = "required" - - conversation = [{"content": instruction, "role": "user"}] - return server, agent, conversation - - def _needs_startup_delay(self) -> bool: - """Check if the service needs a startup delay.""" - return (self.mcp_service in self.NPX_BASED_SERVICES or - self.mcp_service in self.PIPX_BASED_SERVICES) - - async def _create_mcp_server(self) -> Any: - """Create and return an MCP server instance for the current service using self.service_config.""" - - # Add startup delay if needed - if self._needs_startup_delay(): - logger.debug(f"Adding startup delay for service: {self.mcp_service}") - await asyncio.sleep(5) - - # Dispatch to service-specific creator - service_creators = { - "notion": self._create_notion_server, - "github": self._create_github_server, - "filesystem": self._create_filesystem_server, - "playwright": self._create_playwright_server, - "playwright_webarena": self._create_playwright_webarena_server, - "postgres": self._create_postgres_server, - } - - creator = service_creators.get(self.mcp_service) - if not creator: - raise ValueError(f"Unsupported MCP service: {self.mcp_service}") - - return creator() - - def _create_notion_server(self) -> MCPServerStdio: - """Create Notion MCP server.""" - cfg = self.service_config - notion_key = cfg.get("notion_key") - - if not notion_key: - raise ValueError( - "Notion API key (notion_key) is required for Notion MCP server" - ) - - return MCPServerStdio( - params={ - "command": "npx", - "args": ["-y", "@notionhq/notion-mcp-server"], - "env": { - "OPENAPI_MCP_HEADERS": ( - '{"Authorization": "Bearer ' + notion_key + '", ' - '"Notion-Version": "2022-06-28"}' - ) - }, - }, - client_session_timeout_seconds=120, - cache_tools_list=True, - ) - - def _create_github_server(self) -> MCPServerStreamableHttp: - """Create GitHub MCP server.""" - cfg = self.service_config - github_token = cfg.get("github_token") - - if not github_token: - raise ValueError( - "GitHub token (github_token) is required for GitHub MCP server" - ) - - params = MCPServerStreamableHttpParams( - url="https://api.githubcopilot.com/mcp/", - headers={ - "Authorization": f"Bearer {github_token}", - "User-Agent": "MCPMark/1.0", - }, - timeout_seconds=30, - ) - - return MCPServerStreamableHttp( - params=params, - cache_tools_list=True, - name="GitHub MCP Server", - client_session_timeout_seconds=120, - ) - - def _create_filesystem_server(self) -> MCPServerStdio: - """Create Filesystem MCP server.""" - cfg = self.service_config - test_directory = cfg.get("test_directory") - - if not test_directory: - raise ValueError( - "filesystem service requires 'test_directory' in service_config" - ) - - return MCPServerStdio( - params={ - "command": "npx", - "args": [ - "-y", - "@modelcontextprotocol/server-filesystem", - str(test_directory), - ], - }, - client_session_timeout_seconds=120, - cache_tools_list=True, - ) - - def _create_playwright_server(self) -> MCPServerStdio: - """Create Playwright MCP server.""" - cfg = self.service_config - browser = cfg.get("browser", "chromium") - headless = cfg.get("headless", True) - viewport_width = cfg.get("viewport_width", 1280) - viewport_height = cfg.get("viewport_height", 720) - - args = ["-y", "@playwright/mcp@latest"] - if headless: - args.append("--headless") - args.append("--isolated") - args.append("--no-sandbox") # Required for Docker - args.extend([ - "--browser", browser, - "--viewport-size", f"{viewport_width},{viewport_height}", - ]) - - return MCPServerStdio( - params={ - "command": "npx", - "args": args, - }, - client_session_timeout_seconds=120, - cache_tools_list=True, - ) - - def _create_playwright_webarena_server(self) -> MCPServerStdio: - """Create Playwright WebArena MCP server.""" - cfg = self.service_config - # Same as playwright but with base_url support - browser = cfg.get("browser", "chromium") - headless = cfg.get("headless", True) - viewport_width = cfg.get("viewport_width", 1280) - viewport_height = cfg.get("viewport_height", 720) - - args = ["-y", "@playwright/mcp@latest"] - if headless: - args.append("--headless") - args.append("--isolated") - args.extend([ - "--browser", browser, - "--viewport-size", f"{viewport_width},{viewport_height}", - ]) - - return MCPServerStdio( - params={ - "command": "npx", - "args": args, - }, - client_session_timeout_seconds=120, - cache_tools_list=True, - ) - - def _create_postgres_server(self) -> MCPServerStdio: - """Create PostgreSQL MCP server.""" - cfg = self.service_config - host = cfg.get("host", "localhost") - port = cfg.get("port", 5432) - username = cfg.get("username") - password = cfg.get("password") - database = cfg.get("current_database") or cfg.get("database") - - if not all([username, password, database]): - raise ValueError( - "PostgreSQL service requires username, password, and database in service_config" - ) - - database_url = f"postgresql://{username}:{password}@{host}:{port}/{database}" - - return MCPServerStdio( - params={ - "command": "pipx", - "args": ["run", "postgres-mcp", "--access-mode=unrestricted"], - "env": { - "DATABASE_URI": database_url, - }, - }, - client_session_timeout_seconds=120, - cache_tools_list=True, - ) - - def _write_to_log_file(self, log_file_path: str, content: str): - """Write content to log file, creating directory if needed.""" - if log_file_path: - try: - import os - - os.makedirs(os.path.dirname(log_file_path), exist_ok=True) - with open(log_file_path, "a", encoding="utf-8") as f: - f.write(content) - except Exception as log_error: - logger.debug(f"Failed to write to log file: {log_error}") - - async def _execute_without_streaming( - self, instruction: str, tool_call_log_file: str = None - ) -> Dict[str, Any]: - """ - Execute instruction with agent using non-streaming (sync) response. - - Args: - instruction: The instruction/prompt to execute - tool_call_log_file: Optional path to log tool calls - - Returns: - Dictionary containing execution results - """ - start_time = time.time() - - try: - # Common setup - server, agent, conversation = await self._prepare_agent_execution(instruction) - - async with server: - # Run agent with non-streaming sync - result = Runner.run_sync( - agent, - max_turns=self.MAX_TURNS, - input=conversation, - run_config=RunConfig(model_provider=self.model_provider), - ) - - # Extract conversation output - conversation_output = result.to_input_list() - - # Log tool calls if requested - if tool_call_log_file: - self._log_tool_calls_from_conversation( - conversation_output, tool_call_log_file - ) - - # Extract token usage using helper - token_usage = self._extract_token_usage(result.raw_responses) - - # Print token usage if available - if token_usage: - logger.info( - f"\n| Token usage: Total: {token_usage['total_tokens']:,} | " - f"Input: {token_usage['input_tokens']:,} | " - f"Output: {token_usage['output_tokens']:,}" - ) - - # Extract turn count consistently (use current_turn like streaming) - turn_count = len(result.raw_responses if hasattr(result, "raw_responses") else []) - if turn_count: - logger.info(f"| Turns: {turn_count}") - - execution_time = time.time() - start_time - - # Update statistics and return success - self._update_statistics(True, token_usage, turn_count, execution_time) - return self._create_success_response( - conversation_output, token_usage, turn_count, execution_time - ) - - except Exception as e: - execution_time = time.time() - start_time - - conversation_output = [] - token_usage: Dict[str, int] = {} - turn_count = 0 - - # If this is an AgentsException with run_data, extract partials - if isinstance(e, AgentsException) and getattr(e, "run_data", None): - try: - rd = e.run_data # type: ignore[attr-defined] - # Reconstruct conversation similar to RunResult.to_input_list() - original_items = ItemHelpers.input_to_new_input_list(rd.input) - new_items = [item.to_input_item() for item in rd.new_items] - conversation_output = original_items + new_items - - # Prefer aggregated usage from context_wrapper - usage = getattr(getattr(rd, "context_wrapper", None), "usage", None) - if usage: - token_usage = { - "input_tokens": usage.input_tokens or 0, - "output_tokens": usage.output_tokens or 0, - "total_tokens": usage.total_tokens or 0, - } - else: - # Fallback: aggregate from raw_responses - token_usage = self._extract_token_usage(rd.raw_responses) - - # Completed turns are the number of model_responses collected - turn_count = len(getattr(rd, "raw_responses", []) or []) - except Exception as extract_err: - logger.debug(f"Failed to extract run_data on error: {extract_err}") - - # Update aggregate statistics using whatever we extracted - self._update_statistics(False, token_usage, turn_count, execution_time) - - error_msg = f"Agent execution failed: {e}" - logger.error(error_msg, exc_info=True) - - # Log error to file if specified - if tool_call_log_file: - self._write_to_log_file(tool_call_log_file, f"\n| ERROR: {error_msg}\n") - - return self._create_error_response( - execution_time, - str(e), - conversation_output if conversation_output else [], - token_usage if token_usage else {}, - turn_count, - ) - - def _log_tool_calls_from_conversation( - self, conversation: list, log_file_path: str - ) -> None: - """ - Parse and log tool calls from the conversation output. - - Args: - conversation: List of conversation messages - log_file_path: Path to log file - """ - if not log_file_path: - return - - for msg in conversation: - if msg.get("role") == "assistant": - # Log text content - content = msg.get("content", "") - if content: - self._write_to_log_file(log_file_path, f"{content}\n") - - # Log tool calls - tool_calls = msg.get("tool_calls", []) - for tool_call in tool_calls: - if isinstance(tool_call, dict): - tool_name = tool_call.get("function", {}).get("name", "Unknown") - arguments = tool_call.get("function", {}).get("arguments", "") - - # Parse arguments if it's a JSON string - try: - if isinstance(arguments, str): - args_dict = json.loads(arguments) - args_str = json.dumps(args_dict, separators=(",", ": ")) - else: - args_str = json.dumps(arguments, separators=(",", ": ")) - except: - args_str = str(arguments) - - # Log tool call - logger.info(f"| {tool_name} {args_str[:140]}..." if len(args_str) > 140 else f"| {tool_name} {args_str}") - self._write_to_log_file(log_file_path, f"| {tool_name} {args_str}\n") - - async def _execute_with_streaming( - self, instruction: str, tool_call_log_file: str = None - ) -> Dict[str, Any]: - """ - Execute instruction with agent using streaming response. - - Args: - instruction: The instruction/prompt to execute - tool_call_log_file: Optional path to log tool calls (Service configuration is taken from self.service_config) - - Returns: - Dictionary containing execution results - """ - start_time = time.time() - - # Initialize partial results to preserve even on failure - partial_output = [] - partial_token_usage = {} - partial_turn_count = 0 - - try: - # Common setup - server, agent, conversation = await self._prepare_agent_execution(instruction) - - async with server: - # Run agent with streaming - result = Runner.run_streamed( - agent, - max_turns=self.MAX_TURNS, - input=conversation, - run_config=RunConfig(model_provider=self.model_provider), - ) - - # Add small delay to ensure background task starts - await asyncio.sleep(0.1) - - # Process streaming events - event_count = 0 - # Prefix each assistant output line with '| ' - line_prefix = "| " - at_line_start = True - last_event_type = None # Track the previous event type - - # Track if max_turns was exceeded - max_turns_exceeded = False - - try: - async for event in result.stream_events(): - event_count += 1 - logger.debug(f"Event {event_count}: {event}") - - if hasattr(event, "type"): - logger.debug(f"Event type: {event.type}") - - if event.type == "raw_response_event": - if hasattr(event, "data") and isinstance( - event.data, ResponseTextDeltaEvent - ): - delta_text = event.data.delta or "" - # Stream with line prefix, handling chunked newlines - for chunk in delta_text.splitlines( - True - ): # keepends=True - if at_line_start: - print(line_prefix, end="", flush=True) - print(chunk, end="", flush=True) - at_line_start = chunk.endswith("\n") - - # Also log text output to file (preserve original formatting) - if delta_text.strip(): # Only log non-empty content - self._write_to_log_file( - tool_call_log_file, delta_text - ) - - last_event_type = "text_output" - - elif event.type == "run_item_stream_event": - if ( - hasattr(event, "item") - and getattr(event.item, "type", "") - == "tool_call_item" - ): - if last_event_type == "text_output": - # Add newline if text wasn't already on a new line - if not at_line_start: - print("\n", end="", flush=True) - at_line_start = True - - tool_name = getattr( - getattr(event.item, "raw_item", None), - "name", - "Unknown", - ) - - arguments = getattr( - getattr(event.item, "raw_item", None), - "arguments", - None, - ) - - if isinstance(arguments, str): - display_arguments = ( - arguments[:140] + "..." - if len(arguments) > 140 - else arguments - ) - else: - # Convert non-string arguments to single-line JSON - try: - args_str = json.dumps( - arguments, separators=(",", ": ") - ) - display_arguments = ( - args_str[:140] + "..." - if len(args_str) > 140 - else args_str - ) - except Exception: - display_arguments = str(arguments)[:140] - logger.info( - f"| \033[1m{tool_name}\033[0m \033[2;37m{display_arguments}\033[0m" - ) - - # Also log tool call to log file (ensure proper line breaks) - args_str = ( - arguments - if isinstance(arguments, str) - else json.dumps( - arguments, separators=(",", ": ") - ) - ) - # Add newline before tool call if previous was text output - prefix = ( - "\n" if last_event_type == "text_output" else "" - ) - self._write_to_log_file( - tool_call_log_file, - f"{prefix}| {tool_name} {args_str}\n", - ) - - last_event_type = "tool_call" - - except Exception as stream_error: - error_msg = f"Error during streaming: {stream_error}" - logger.error(error_msg, exc_info=True) - # Also log error to file (ensure proper line break) - self._write_to_log_file( - tool_call_log_file, f"\n| ERROR: {error_msg}\n" - ) - - # Try to extract whatever conversation output we can get from the result - try: - if hasattr(result, "to_input_list"): - partial_output = result.to_input_list() - logger.debug( - f"Extracted partial output during stream error: {len(partial_output) if partial_output else 0} messages" - ) - except Exception as extract_error: - logger.debug( - f"Failed to extract output during stream error: {extract_error}" - ) - # Keep the existing partial_output - - # Try to extract token usage from any available raw responses - try: - if hasattr(result, "raw_responses") and result.raw_responses: - total_input_tokens = 0 - total_output_tokens = 0 - total_tokens = 0 - for response in result.raw_responses: - if hasattr(response, "usage") and response.usage: - total_input_tokens += ( - response.usage.input_tokens or 0 - ) - total_output_tokens += ( - response.usage.output_tokens or 0 - ) - total_tokens += response.usage.total_tokens or 0 - - partial_token_usage = { - "input_tokens": total_input_tokens, - "output_tokens": total_output_tokens, - "total_tokens": total_tokens, - } - logger.debug( - f"Extracted partial token usage during stream error: {partial_token_usage}" - ) - - # Try to extract turn count - if hasattr(result, "current_turn"): - partial_turn_count = max(result.current_turn - 1, 0) - logger.debug( - f"Extracted partial turn count during stream error: {partial_turn_count}" - ) - except Exception as usage_error: - logger.debug( - f"Failed to extract token usage during stream error: {usage_error}" - ) - # Keep the existing partial values - - # If this is a critical streaming error, we should fail the execution - # rather than continuing and potentially returning success=True - execution_time = time.time() - start_time - self._usage_stats["failed_executions"] += 1 - self._usage_stats["total_execution_time"] += execution_time - - # Update usage stats with any partial token usage we collected - if partial_token_usage: - self._usage_stats["total_input_tokens"] += ( - partial_token_usage.get("input_tokens", 0) - ) - self._usage_stats["total_output_tokens"] += ( - partial_token_usage.get("output_tokens", 0) - ) - self._usage_stats["total_tokens"] += partial_token_usage.get( - "total_tokens", 0 - ) - self._usage_stats["total_turns"] += partial_turn_count - - return { - "success": False, - "output": partial_output if partial_output else [], - "token_usage": partial_token_usage - if partial_token_usage - else {}, - "turn_count": partial_turn_count, - "execution_time": execution_time, - "error": str(stream_error), - } - - # Extract token usage from raw responses using helper - token_usage = self._extract_token_usage(result.raw_responses if hasattr(result, "raw_responses") else []) - if token_usage: - # Update partial token usage as we go - partial_token_usage = token_usage - else: - # If raw_responses is empty, try to extract from individual responses - logger.debug( - "No raw_responses found, checking for other response data" - ) - - # Extract turn count - turn_count = len(result.raw_responses if hasattr(result, "raw_responses") else []) - if turn_count: - partial_turn_count = turn_count - - # Try to extract partial conversation output in case of later failure - try: - partial_output = result.to_input_list() - except Exception as e: - logger.debug(f"Failed to extract conversation output: {e}") - # Keep whatever partial output we had before - - # Pretty usage block (prefixed lines) - if token_usage: - total_input_tokens = token_usage.get("input_tokens", 0) - total_output_tokens = token_usage.get("output_tokens", 0) - total_tokens = token_usage.get("total_tokens", 0) - - lines = [ - "\n| ────────────────────────────────────────────────", - "| \033[1mToken usage\033[0m", - "|", - f"| Total: {total_tokens:,} | Input: {total_input_tokens:,} | Output: {total_output_tokens:,}", - ] - if turn_count is not None: - lines.append( - "| ────────────────────────────────────────────────" - ) - lines.append(f"| \033[1mTurns\033[0m: {turn_count}") - lines.append( - "| ────────────────────────────────────────────────" - ) - logger.info("\n".join(lines)) - - # Extract conversation output - conversation_output = [] - try: - conversation_output = result.to_input_list() - except Exception as e: - logger.debug(f"Failed to extract final conversation output: {e}") - conversation_output = partial_output if partial_output else [] - - # Update partial results with final values - partial_output = conversation_output - partial_token_usage = ( - token_usage if token_usage else partial_token_usage - ) - partial_turn_count = turn_count if turn_count else partial_turn_count - - execution_time = time.time() - start_time - - # Check if we hit max_turns limit and adjust turn count - if max_turns_exceeded and turn_count: - # When max_turns is exceeded, SDK reports the turn it tried to start - # but didn't execute, so subtract 1 for actual completed turns - turn_count = turn_count - 1 - - # Check if we hit max_turns limit and should report as error - if max_turns_exceeded: - self._update_statistics(False, token_usage, turn_count or 0, execution_time) - return self._create_error_response( - execution_time, - f"Max turns ({turn_count if turn_count else 0}) exceeded", - conversation_output, - token_usage, - turn_count or 0 - ) - - # Update statistics and return success - self._update_statistics(True, token_usage, turn_count or 0, execution_time) - return self._create_success_response( - conversation_output, token_usage, turn_count or 0, execution_time - ) - - except Exception as e: - execution_time = time.time() - start_time - - # Update statistics with partial results - self._update_statistics( - False, - partial_token_usage, - partial_turn_count, - execution_time - ) - - error_msg = f"Agent execution failed: {e}" - logger.error(f"| {error_msg}", exc_info=True) - - # Log error to file if specified - if tool_call_log_file: - self._write_to_log_file(tool_call_log_file, f"\n| ERROR: {error_msg}\n") - - # Return error response with partial results preserved - return self._create_error_response( - execution_time, - str(e), - partial_output if partial_output else [], - partial_token_usage, - partial_turn_count - ) - - async def execute( - self, instruction: str, tool_call_log_file: str = None - ) -> Dict[str, Any]: - """ - Execute instruction without retries. - - Args: - instruction: The instruction/prompt to execute - tool_call_log_file: Optional path to log tool calls (Service configuration is taken from self.service_config) - - Returns: - Dictionary containing: - - success: bool - - output: conversation output (list of messages) - - token_usage: dict with token statistics - - turn_count: number of conversational turns - - execution_time: execution time in seconds - - error: error message if failed - """ - - if self.stream: - result = await asyncio.wait_for( - self._execute_with_streaming(instruction, tool_call_log_file), - timeout=self.timeout, - ) - else: - result = await asyncio.wait_for( - self._execute_without_streaming(instruction, tool_call_log_file), - timeout=self.timeout, - ) - - return result - - def execute_sync( - self, instruction: str, tool_call_log_file: str = None - ) -> Dict[str, Any]: - """ - Synchronous wrapper for execute method. - - Args: - instruction: The instruction/prompt to execute - tool_call_log_file: Optional path to log tool calls (Service configuration is taken from self.service_config) - - Returns: - Dictionary containing execution results - """ - try: - return asyncio.run(self.execute(instruction, tool_call_log_file)) - except asyncio.TimeoutError: - self._usage_stats["failed_executions"] += 1 - return { - "success": False, - "output": "", - "token_usage": {}, - "turn_count": 0, - "execution_time": self.timeout, - "error": f"Execution timed out after {self.timeout} seconds", - } - - def get_usage_stats(self) -> Dict[str, Any]: - """ - Get usage statistics for this agent. - - Returns: - Dictionary containing usage statistics - """ - stats = self._usage_stats.copy() - - # Calculate averages - total_executions = stats["successful_executions"] + stats["failed_executions"] - if total_executions > 0: - stats["avg_input_tokens"] = stats["total_input_tokens"] / total_executions - stats["avg_output_tokens"] = stats["total_output_tokens"] / total_executions - stats["avg_total_tokens"] = stats["total_tokens"] / total_executions - stats["avg_turns"] = stats["total_turns"] / total_executions - stats["avg_execution_time"] = ( - stats["total_execution_time"] / total_executions - ) - stats["success_rate"] = ( - stats["successful_executions"] / total_executions * 100 - ) - else: - stats.update( - { - "avg_input_tokens": 0.0, - "avg_output_tokens": 0.0, - "avg_total_tokens": 0.0, - "avg_turns": 0.0, - "avg_execution_time": 0.0, - "success_rate": 0.0, - } - ) - - return stats - - def reset_usage_stats(self): - """Reset usage statistics.""" - self._usage_stats = { - "total_input_tokens": 0, - "total_output_tokens": 0, - "total_tokens": 0, - "total_turns": 0, - "total_execution_time": 0.0, - "successful_executions": 0, - "failed_executions": 0, - } - - def __repr__(self): - return ( - f"MCPAgent(mcp_service='{self.mcp_service}', model='{self.model_name}', " - f"framework='{self.agent_framework}')" - ) - - -def main(): - """Example usage of the MCPAgent.""" - from dotenv import load_dotenv - - # Load environment variables - load_dotenv(dotenv_path=".mcp_env", override=False) - - # Example: Create a Notion agent - agent = MCPAgent( - model_name="gpt-4o", - api_key=os.getenv("OPENAI_API_KEY"), - base_url=os.getenv("OPENAI_BASE_URL"), - service="notion", - ) - - # Example execution - instruction = "List all pages in my Notion workspace" - - result = agent.execute_sync(instruction) - print(f"Success: {result['success']}") - print(f"Token usage: {result['token_usage']}") - print(f"Usage stats: {agent.get_usage_stats()}") - - -if __name__ == "__main__": - main() diff --git a/src/agents/__init__.py b/src/agents/__init__.py new file mode 100644 index 0000000..ccbf714 --- /dev/null +++ b/src/agents/__init__.py @@ -0,0 +1,11 @@ +""" +MCPMark Agent Module +==================== + +Provides a unified agent implementation using LiteLLM for model interactions +and minimal MCP server management. +""" + +from .mcpmark_agent import MCPMarkAgent + +__all__ = ["MCPMarkAgent"] \ No newline at end of file diff --git a/src/agents/mcp/__init__.py b/src/agents/mcp/__init__.py new file mode 100644 index 0000000..09824d3 --- /dev/null +++ b/src/agents/mcp/__init__.py @@ -0,0 +1,11 @@ +""" +MCP (Model Context Protocol) Components +======================================== + +Minimal MCP server implementations for MCPMark. +""" + +from .stdio_server import MCPStdioServer +from .http_server import MCPHttpServer + +__all__ = ["MCPStdioServer", "MCPHttpServer"] \ No newline at end of file diff --git a/src/agents/mcp/http_server.py b/src/agents/mcp/http_server.py new file mode 100644 index 0000000..9c0cc36 --- /dev/null +++ b/src/agents/mcp/http_server.py @@ -0,0 +1,78 @@ +""" +Minimal MCP HTTP Server Implementation +======================================= + +Provides HTTP-based MCP server communication for services like GitHub. +""" + +import asyncio +from contextlib import AsyncExitStack +from typing import Any, Dict, List, Optional + +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + +class MCPHttpServer: + """ + HTTP-based MCP client using the official MCP Python SDK + (Streamable HTTP transport). + """ + + def __init__( + self, + url: str, + headers: Optional[Dict[str, str]] = None, + timeout: int = 30, + ): + self.url = url.rstrip("/") + self.headers = headers or {} + self.timeout = timeout + + self._stack: Optional[AsyncExitStack] = None + self.session: Optional[ClientSession] = None + self._tools_cache: Optional[List[Dict[str, Any]]] = None + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.stop() + + async def start(self): + """Open Streamable HTTP transport and initialize MCP session.""" + self._stack = AsyncExitStack() + + read_stream, write_stream, _ = await self._stack.enter_async_context( + streamablehttp_client(self.url, headers=self.headers) + ) + + self.session = await self._stack.enter_async_context(ClientSession(read_stream, write_stream)) + await asyncio.wait_for(self.session.initialize(), timeout=self.timeout) + + async def stop(self): + """Close the session/transport cleanly.""" + if self._stack: + await self._stack.aclose() + self._stack = None + self.session = None + self._tools_cache = None + + async def list_tools(self) -> List[Dict[str, Any]]: + """Return tool definitions (cached).""" + if self._tools_cache is not None: + return self._tools_cache + if not self.session: + raise RuntimeError("MCP HTTP client not started") + + resp = await asyncio.wait_for(self.session.list_tools(), timeout=self.timeout) + self._tools_cache = [t.model_dump() for t in resp.tools] + return self._tools_cache + + async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any: + """Invoke a remote tool and return the structured result.""" + if not self.session: + raise RuntimeError("MCP HTTP client not started") + + result = await asyncio.wait_for(self.session.call_tool(name, arguments), timeout=self.timeout) + return result.model_dump() diff --git a/src/agents/mcp/stdio_server.py b/src/agents/mcp/stdio_server.py new file mode 100644 index 0000000..80368fd --- /dev/null +++ b/src/agents/mcp/stdio_server.py @@ -0,0 +1,46 @@ +""" +Minimal MCP Stdio Server Implementation +======================================== + +Provides stdio-based MCP server communication for services like +Notion, Filesystem, Playwright, and Postgres. +""" + +import asyncio +import os +from contextlib import AsyncExitStack +from typing import Any, Dict, List, Optional + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +class MCPStdioServer: + """Lightweight wrapper around the official MCP Python SDK.""" + + def __init__(self, command: str, args: List[str], env: Optional[Dict[str, str]] = None, timeout: int = 120): + self.params = StdioServerParameters(command=command, args=args, env={**os.environ, **(env or {})}) + self.timeout = timeout + self._stack: Optional[AsyncExitStack] = None + self._streams = None + self.session: Optional[ClientSession] = None + + async def __aenter__(self): + self._stack = AsyncExitStack() + read, write = await self._stack.enter_async_context(stdio_client(self.params)) + self.session = await self._stack.enter_async_context(ClientSession(read, write)) + await asyncio.wait_for(self.session.initialize(), timeout=self.timeout) + return self + + async def __aexit__(self, exc_type, exc, tb): + if self._stack: + await self._stack.aclose() + self._stack = None + self.session = None + + async def list_tools(self) -> List[Dict[str, Any]]: + resp = await asyncio.wait_for(self.session.list_tools(), timeout=self.timeout) + return [t.model_dump() for t in resp.tools] + + async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any: + result = await asyncio.wait_for(self.session.call_tool(name, arguments), timeout=self.timeout) + return result.model_dump() # 同上,转成 dict diff --git a/src/agents/mcpmark_agent.py b/src/agents/mcpmark_agent.py new file mode 100644 index 0000000..69e516b --- /dev/null +++ b/src/agents/mcpmark_agent.py @@ -0,0 +1,1198 @@ +""" +MCPMark Agent Implementation +============================ + +Unified agent using LiteLLM for all model interactions with minimal MCP support. +""" + +import asyncio +import json +import time +import uuid + +from typing import Any, Dict, List, Optional, Callable + +import httpx +import litellm +import nest_asyncio + +from src.logger import get_logger +from .mcp import MCPStdioServer, MCPHttpServer +from .utils import TokenUsageTracker + +# Apply nested asyncio support +nest_asyncio.apply() + +# Configure LiteLLM +litellm.suppress_debug_info = True + +logger = get_logger(__name__) + +class MCPMarkAgent: + """ + Unified agent for LLM and MCP server management using LiteLLM. + + - Anthropic models: Native MCP support via extra_body + - Other models: Manual MCP server management with function calling + """ + + # Constants + MAX_TURNS = 100 + DEFAULT_TIMEOUT = 600 + SYSTEM_PROMPT = ( + "You are a helpful agent that uses tools iteratively to complete the user's task, " + "and when finished, provides the final answer or simply states \"Task completed\" without further tool calls." + ) + + # Service categories + STDIO_SERVICES = ["notion", "filesystem", "playwright", "playwright_webarena", "postgres"] + HTTP_SERVICES = ["github"] + + # Claude thinking budget mapping + CLAUDE_THINKING_BUDGETS = { + "low": 1024, + "medium": 2048, + "high": 4096 + } + + # ==================== Initialization and Configuration ==================== + + def __init__( + self, + litellm_input_model_name: str, + api_key: str, + base_url: str, + mcp_service: str, + timeout: int = DEFAULT_TIMEOUT, + service_config: Optional[Dict[str, Any]] = None, + service_config_provider: Optional[Callable[[], Dict]] = None, + reasoning_effort: Optional[str] = "default", + ): + """ + Initialize the MCPMark agent. + + Args: + model_name: Name of the LLM model + api_key: API key for the model provider + base_url: Base url + mcp_service: MCP service type + timeout: Execution timeout in seconds + service_config: Service-specific configuration + service_config_provider: Optional provider for dynamic config + reasoning_effort: Reasoning effort level ("default", "minimal", "low", "medium", "high") + """ + self.litellm_input_model_name = litellm_input_model_name + self.api_key = api_key + self.base_url = base_url + self.mcp_service = mcp_service + self.timeout = timeout + self.service_config = service_config or {} + self._service_config_provider = service_config_provider + self.reasoning_effort = reasoning_effort + + # Detect if this is a Claude model + self.is_claude = self._is_anthropic_model(litellm_input_model_name) + + # Determine execution path: Claude with thinking or LiteLLM + self.use_claude_thinking = self.is_claude and reasoning_effort != "default" + + # Initialize usage tracker + self.usage_tracker = TokenUsageTracker() + + # Track the actual model name from responses + self.litellm_run_model_name = None + + logger.debug( + f"Initialized MCPMarkAgent for '{mcp_service}' with model '{litellm_input_model_name}' " + f"(Claude: {self.is_claude}, Thinking: {self.use_claude_thinking}, Reasoning: {reasoning_effort})" + ) + + + def __repr__(self): + return ( + f"MCPMarkAgent(service='{self.mcp_service}', model='{self.litellm_input_model_name}', " + ) + + def _is_anthropic_model(self, model_name: str) -> bool: + """Check if the model is an Anthropic model.""" + return "claude" in model_name.lower() + + + def _get_claude_thinking_budget(self) -> Optional[int]: + """Get thinking budget for Claude based on reasoning effort.""" + if not self.use_claude_thinking: + return None + return self.CLAUDE_THINKING_BUDGETS.get(self.reasoning_effort, 2048) + + + def _refresh_service_config(self): + """Refresh service config from provider if available.""" + if self._service_config_provider: + try: + latest_cfg = self._service_config_provider() or {} + self.service_config.update(latest_cfg) + except Exception as e: + logger.warning(f"| Failed to refresh service config: {e}") + + + + # ==================== Public Interface Methods ==================== + + async def execute( + self, + instruction: str, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """ + Execute instruction with the agent. + + Args: + instruction: The instruction/prompt to execute + tool_call_log_file: Optional path to log tool calls + + Returns: + Dictionary containing execution results + """ + start_time = time.time() + + try: + # Refresh service configuration + self._refresh_service_config() + + # Execute with timeout control + async def _execute_with_strategy(): + if self.use_claude_thinking: + # Claude with thinking -> native Anthropic API with tools + return await self._execute_claude_native_with_tools( + instruction, tool_call_log_file + ) + else: + # All other cases -> LiteLLM with tools + return await self._execute_litellm_with_tools( + instruction, tool_call_log_file + ) + + # Apply timeout to the entire execution + result = await asyncio.wait_for( + _execute_with_strategy(), + timeout=self.timeout + ) + + execution_time = time.time() - start_time + + # Update usage statistics + self.usage_tracker.update( + success=result["success"], + token_usage=result.get("token_usage", {}), + turn_count=result.get("turn_count", 0), + execution_time=execution_time + ) + + result["execution_time"] = execution_time + return result + + except asyncio.TimeoutError: + execution_time = time.time() - start_time + error_msg = f"Execution timed out after {self.timeout} seconds" + logger.error(error_msg) + + self.usage_tracker.update( + success=False, + token_usage={}, + turn_count=0, + execution_time=execution_time + ) + + return { + "success": False, + "output": [], + "token_usage": {}, + "turn_count": 0, + "execution_time": execution_time, + "error": error_msg + } + + except Exception as e: + execution_time = time.time() - start_time + error_msg = f"Agent execution failed: {e}" + logger.error(error_msg, exc_info=True) + + self.usage_tracker.update( + success=False, + token_usage={}, + turn_count=0, + execution_time=execution_time + ) + + return { + "success": False, + "output": [], + "token_usage": {}, + "turn_count": 0, + "execution_time": execution_time, + "error": str(e) + } + + + def execute_sync( + self, + instruction: str, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """ + Synchronous wrapper for execute method. + """ + try: + return asyncio.run(self.execute(instruction, tool_call_log_file)) + except asyncio.TimeoutError: + self.usage_tracker.update(False, {}, 0, self.timeout) + return { + "success": False, + "output": [], + "token_usage": {}, + "turn_count": 0, + "execution_time": self.timeout, + "error": f"Execution timed out after {self.timeout} seconds" + } + + + def get_usage_stats(self) -> Dict[str, Any]: + """Get usage statistics.""" + return self.usage_tracker.get_stats() + + + def reset_usage_stats(self): + """Reset usage statistics.""" + self.usage_tracker.reset() + + + + # ==================== Claude Native API Execution Path ==================== + + async def _execute_claude_native_with_tools( + self, + instruction: str, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """ + Execute Claude with thinking using native Anthropic API. + Creates MCP server, gets tools, and executes with thinking. + """ + logger.debug("Using Claude native API with thinking") + + thinking_budget = self._get_claude_thinking_budget() + + # Create and start MCP server + mcp_server = await self._create_mcp_server() + + try: + async with mcp_server: + # Get available tools + tools = await mcp_server.list_tools() + + # Convert MCP tools to Anthropic format + anthropic_tools = self._convert_to_anthropic_format(tools) + + # Execute with function calling loop + return await self._execute_anthropic_native_tool_loop( + instruction, anthropic_tools, mcp_server, + thinking_budget, tool_call_log_file + ) + + except Exception as e: + logger.error(f"Claude native execution failed: {e}") + return { + "success": False, + "output": [], + "token_usage": {}, + "turn_count": 0, + "error": str(e), + "litellm_run_model_name": self.litellm_run_model_name, + } + + + async def _call_claude_native_api( + self, + messages: List[Dict], + thinking_budget: int, + tools: Optional[List[Dict]] = None, + mcp_servers: Optional[List[Dict]] = None, + system: Optional[str] = None + ) -> Dict[str, Any]: + """ + Call Claude's native API directly using httpx. + + Args: + messages: Conversation messages + thinking_budget: Token budget for thinking + tools: Tool definitions for function calling + mcp_servers: MCP server configurations + system: System prompt + + Returns: + API response as dictionary + """ + # Get API base and headers + import os + api_base = os.getenv("ANTHROPIC_API_BASE", "https://api.anthropic.com") + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + + # Build payload + max_tokens = max(thinking_budget + 4096, 4096) + payload = { + "model": self.litellm_input_model_name.replace("anthropic/", ""), + "max_tokens": max_tokens, + "messages": messages, + } + + # Add thinking configuration + if thinking_budget: + payload["thinking"] = { + "type": "enabled", + "budget_tokens": thinking_budget + } + + # Add tools if provided + if tools: + payload["tools"] = tools + payload["tool_choice"] = {"type": "auto"} + + # Add MCP servers if provided + if mcp_servers: + headers["anthropic-beta"] = "mcp-client-2025-04-04" + payload["mcp_servers"] = mcp_servers + + # Add system prompt if provided + if system: + payload["system"] = system + + # Make the API call + async with httpx.AsyncClient() as client: + try: + response = await client.post( + f"{api_base}/v1/messages", + headers=headers, + json=payload, + timeout=self.timeout + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error(f"Claude API error: {e.response.text}") + raise + except Exception as e: + logger.error(f"Claude API call failed: {e}") + raise + + + async def _execute_anthropic_native_tool_loop( + self, + instruction: str, + tools: List[Dict], + mcp_server: Any, + thinking_budget: int, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """ + Execute Claude thinking loop with function calling. + Handles thinking blocks, tool calls, and message formatting. + """ + messages = [{"role": "user", "content": instruction}] + total_tokens = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0, "reasoning_tokens": 0} + turn_count = 0 + max_turns = self.MAX_TURNS + hit_turn_limit = False + ended_normally = False + + system_text = self.SYSTEM_PROMPT + + for _ in range(max_turns): + turn_count += 1 + + # Call Claude native API + try: + response = await self._call_claude_native_api( + messages=messages, + thinking_budget=thinking_budget, + tools=tools, + system=system_text + ) + if turn_count == 1: + self.litellm_run_model_name = response['model'].split("/")[-1] + except Exception as e: + logger.error(f"Claude API call failed on turn {turn_count}: {e}") + break + + # Update token usage + if "usage" in response: + usage = response["usage"] + input_tokens = usage.get("input_tokens", 0) + output_tokens = usage.get("output_tokens", 0) + # Calculate output tokens as total - input for consistency + total_tokens_count = output_tokens + input_tokens + + total_tokens["input_tokens"] += input_tokens + total_tokens["output_tokens"] += output_tokens + total_tokens["total_tokens"] += total_tokens_count + + ## TODO: add reasoning tokens for claude + + # Extract blocks from response + blocks = response.get("content", []) + tool_uses = [b for b in blocks if b.get("type") == "tool_use"] + thinking_blocks = [b for b in blocks if b.get("type") == "thinking"] + text_blocks = [b for b in blocks if b.get("type") == "text"] + + # Log text output + for tb in text_blocks: + if tb.get("text") and tool_call_log_file: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"{tb['text']}\n") + if tb.get("text"): + for line in tb["text"].splitlines(): + logger.info(f"| {line}") + + # Build assistant message with all blocks + assistant_content = [] + + # Add thinking blocks + for tb in thinking_blocks: + assistant_content.append({ + "type": "thinking", + "thinking": tb.get("thinking", ""), + "signature": tb.get("signature", ""), + }) + + # Add text blocks + for tb in text_blocks: + if tb.get("text"): + assistant_content.append({"type": "text", "text": tb["text"]}) + + # Add tool_use blocks + for tu in tool_uses: + assistant_content.append({ + "type": "tool_use", + "id": tu.get("id"), + "name": tu.get("name"), + "input": tu.get("input", {}), + }) + + messages.append({"role": "assistant", "content": assistant_content}) + + # If no tool calls, we're done + if not tool_uses: + ended_normally = True + break + + # Execute tools and add results + tool_results = [] + for tu in tool_uses: + name = tu.get("name") + inputs = tu.get("input", {}) + + # Log tool call + args_str = json.dumps(inputs, separators=(",", ": ")) + display_args = args_str[:140] + "..." if len(args_str) > 140 else args_str + logger.info(f"| \033[1m{name}\033[0m \033[2;37m{display_args}\033[0m") + + if tool_call_log_file: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"| {name} {args_str}\n") + + # Execute tool + try: + result = await asyncio.wait_for( + mcp_server.call_tool(name, inputs), + timeout=60 + ) + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu["id"], + "content": [{"type": "text", "text": json.dumps(result)}], + }) + except Exception as e: + logger.error(f"Tool call failed: {e}") + tool_results.append({ + "type": "tool_result", + "tool_use_id": tu["id"], + "content": [{"type": "text", "text": f"Error: {str(e)}"}], + }) + + messages.append({"role": "user", "content": tool_results}) + + # Detect if we exited due to hitting the turn limit + if (not ended_normally) and (turn_count >= max_turns): + hit_turn_limit = True + logger.warning(f"| Max turns ({max_turns}) exceeded; returning failure with partial output.") + if tool_call_log_file: + try: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"| Max turns ({max_turns}) exceeded\n") + except Exception: + pass + + # Display final token usage + if total_tokens["total_tokens"] > 0: + log_msg = ( + f"|\n| Token usage: Total: {total_tokens['total_tokens']:,} | " + f"Input: {total_tokens['input_tokens']:,} | " + f"Output: {total_tokens['output_tokens']:,}" + ) + if total_tokens.get("reasoning_tokens", 0) > 0: + log_msg += f" | Reasoning: {total_tokens['reasoning_tokens']:,}" + logger.info(log_msg) + logger.info(f"| Turns: {turn_count}") + + # Convert messages to SDK format + sdk_format_messages = self._convert_to_sdk_format(messages) + + return { + "success": not hit_turn_limit, + "output": sdk_format_messages, + "token_usage": total_tokens, + "turn_count": turn_count, + "error": (f"Max turns ({max_turns}) exceeded" if hit_turn_limit else None), + "litellm_run_model_name": self.litellm_run_model_name, + } + + + # ==================== LiteLLM Execution Path ==================== + + async def _execute_litellm_with_tools( + self, + instruction: str, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """ + Execute with manual MCP server management. + Used for all non-Anthropic models and Anthropic models with STDIO services. + """ + logger.debug("Using manual MCP execution with function calling loop") + + # Create and start MCP server + mcp_server = await self._create_mcp_server() + + try: + async with mcp_server: + # Get available tools + tools = await mcp_server.list_tools() + + # Convert MCP tools to OpenAI function format + functions = self._convert_to_openai_format(tools) + + # Execute with function calling loop + return await self._execute_litellm_tool_loop( + instruction, functions, mcp_server, tool_call_log_file + ) + + except Exception as e: + logger.error(f"Manual MCP execution failed: {e}") + raise + + + async def _execute_litellm_tool_loop( + self, + instruction: str, + functions: List[Dict], + mcp_server: Any, + tool_call_log_file: Optional[str] = None + ) -> Dict[str, Any]: + """Execute function calling loop with LiteLLM.""" + messages = [ + {"role": "system", "content": self.SYSTEM_PROMPT}, + {"role": "user", "content": instruction} + ] + total_tokens = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0, "reasoning_tokens": 0} + turn_count = 0 + max_turns = self.MAX_TURNS # Limit turns to prevent infinite loops + consecutive_failures = 0 + max_consecutive_failures = 3 + hit_turn_limit = False + ended_normally = False + + # Convert functions to tools format for newer models + tools = [{"type": "function", "function": func} for func in functions] if functions else None + + try: + while turn_count < max_turns: + + # Build completion kwargs + completion_kwargs = { + "model": self.litellm_input_model_name, + "messages": messages, + "api_key": self.api_key, + } + + # Always use tools format if available - LiteLLM will handle conversion + if tools: + completion_kwargs["tools"] = tools + completion_kwargs["tool_choice"] = "auto" + + # Add reasoning_effort and base_url if specified + if self.reasoning_effort != "default": + completion_kwargs["reasoning_effort"] = self.reasoning_effort + if self.base_url: + completion_kwargs["base_url"] = self.base_url + + try: + # Call LiteLLM with timeout for individual call + response = await asyncio.wait_for( + litellm.acompletion(**completion_kwargs), + timeout = self.timeout / 2 # Use half of total timeout + ) + consecutive_failures = 0 # Reset failure counter on success + except asyncio.TimeoutError: + logger.warning(f"| ✗ LLM call timed out on turn {turn_count + 1}") + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + raise Exception(f"Too many consecutive failures ({consecutive_failures})") + await asyncio.sleep(8 ** consecutive_failures) # Exponential backoff + continue + except Exception as e: + logger.error(f"| ✗ LLM call failed on turn {turn_count + 1}: {e}") + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + raise + if "ratelimiterror" in str(e).lower(): + await asyncio.sleep(3 ** consecutive_failures) + else: + await asyncio.sleep(24 ** consecutive_failures) # Exponential backoff + continue + + # Extract actual model name from response (first turn only) + if turn_count == 0 and hasattr(response, 'model') and response.model: + self.litellm_run_model_name = response.model.split("/")[-1] + + # Update token usage including reasoning tokens + if hasattr(response, 'usage') and response.usage: + input_tokens = response.usage.prompt_tokens or 0 + total_tokens_count = response.usage.total_tokens or 0 + # Calculate output tokens as total - input for consistency + output_tokens = total_tokens_count - input_tokens if total_tokens_count > 0 else (response.usage.completion_tokens or 0) + + total_tokens["input_tokens"] += input_tokens + total_tokens["output_tokens"] += output_tokens + total_tokens["total_tokens"] += total_tokens_count + + # Extract reasoning tokens if available + if hasattr(response.usage, 'completion_tokens_details'): + details = response.usage.completion_tokens_details + if hasattr(details, 'reasoning_tokens'): + total_tokens["reasoning_tokens"] += details.reasoning_tokens or 0 + + # Get response message + choices = response.choices + if len(choices): + message = choices[0].message + else: + break + + # Convert to dict (prefer model_dump over deprecated dict()) + message_dict = message.model_dump() if hasattr(message, 'model_dump') else dict(message) + + # Log assistant's text content if present + if hasattr(message, 'content') and message.content: + # Display the content with line prefix + for line in message.content.splitlines(): + logger.info(f"| {line}") + + # Also log to file if specified + if tool_call_log_file: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"{message.content}\n") + + # Check for tool calls (newer format) + if hasattr(message, 'tool_calls') and message.tool_calls: + messages.append(message_dict) + turn_count += 1 + # Process tool calls + for tool_call in message.tool_calls: + func_name = tool_call.function.name + func_args = json.loads(tool_call.function.arguments) + + try: + result = await asyncio.wait_for( + mcp_server.call_tool(func_name, func_args), + timeout=60 + ) + messages.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": json.dumps(result) + }) + except asyncio.TimeoutError: + error_msg = f"Tool call '{func_name}' timed out after 30 seconds" + logger.error(error_msg) + messages.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": f"Error: {error_msg}" + }) + except Exception as e: + logger.error(f"Tool call failed: {e}") + messages.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": f"Error: {str(e)}" + }) + + # Format arguments for display (truncate if too long) + args_str = json.dumps(func_args, separators=(",", ": ")) + display_arguments = args_str[:140] + "..." if len(args_str) > 140 else args_str + + # Log with ANSI color codes (bold tool name, dim gray arguments) + logger.info(f"| \033[1m{func_name}\033[0m \033[2;37m{display_arguments}\033[0m") + + if tool_call_log_file: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"| {func_name} {args_str}\n") + continue + else: + # No tool/function call, add message and we're done + messages.append(message_dict) + turn_count += 1 + ended_normally = True + break + except Exception as loop_error: + # On any error, return partial conversation, token usage, and turn count + logger.error(f"Manual MCP loop failed: {loop_error}", exc_info=True) + sdk_format_messages = self._convert_to_sdk_format(messages) + return { + "success": False, + "output": sdk_format_messages, + "token_usage": total_tokens, + "turn_count": turn_count, + "error": str(loop_error), + "litellm_run_model_name": self.litellm_run_model_name, + } + + # Detect if we exited due to hitting the turn limit + if (not ended_normally) and (turn_count >= max_turns): + hit_turn_limit = True + logger.warning(f"| Max turns ({max_turns}) exceeded); returning failure with partial output.") + if tool_call_log_file: + try: + with open(tool_call_log_file, 'a', encoding='utf-8') as f: + f.write(f"| Max turns ({max_turns}) exceeded\n") + except Exception: + pass + + # Display final token usage + if total_tokens["total_tokens"] > 0: + log_msg = ( + f"|\n| Token usage: Total: {total_tokens['total_tokens']:,} | " + f"Input: {total_tokens['input_tokens']:,} | " + f"Output: {total_tokens['output_tokens']:,}" + ) + if total_tokens.get("reasoning_tokens", 0) > 0: + log_msg += f" | Reasoning: {total_tokens['reasoning_tokens']:,}" + logger.info(log_msg) + logger.info(f"| Turns: {turn_count}") + + # Convert messages to SDK format for backward compatibility + sdk_format_messages = self._convert_to_sdk_format(messages) + + return { + "success": not hit_turn_limit, + "output": sdk_format_messages, + "token_usage": total_tokens, + "turn_count": turn_count, + "error": (f"Max turns ({max_turns}) exceeded" if hit_turn_limit else None), + "litellm_run_model_name": self.litellm_run_model_name + } + + + + # ==================== Format Conversion Methods ==================== + + def _convert_to_sdk_format(self, messages: List[Dict]) -> List[Dict]: + """Convert OpenAI messages format to old SDK format for backward compatibility.""" + sdk_format = [] + function_call_map = {} # Track function names to call IDs for legacy format + + for msg in messages: + role = msg.get("role") + + if role == "user": + # User messages stay mostly the same + user_content = msg.get("content", "") + + # Handle tool_result messages (content as list) + if isinstance(user_content, list): + # Check if this is a tool_result message + tool_results = [item for item in user_content if isinstance(item, dict) and item.get("type") == "tool_result"] + if tool_results: + # Convert tool_results to function_call_output format + for tr in tool_results: + content_items = tr.get("content", []) + text_content = "" + for ci in content_items: + if isinstance(ci, dict) and ci.get("type") == "text": + text_content = ci.get("text", "") + break + sdk_format.append({ + "call_id": tr.get("tool_use_id", ""), + "output": json.dumps({ + "type": "text", + "text": text_content, + "annotations": None, + "meta": None + }), + "type": "function_call_output" + }) + else: + # Regular user content as list - extract text + text_parts = [] + for item in user_content: + if isinstance(item, dict) and item.get("type") == "text": + text_parts.append(item.get("text", "")) + sdk_format.append({ + "content": "\n".join(text_parts) if text_parts else "", + "role": "user" + }) + else: + # String content + sdk_format.append({ + "content": user_content, + "role": "user" + }) + + elif role == "assistant": + # === CHANGED ORDER START === + tool_calls = msg.get("tool_calls", []) + function_call = msg.get("function_call") + content = msg.get("content") + + # Handle both string content and list content (for Claude thinking) + if isinstance(content, list): + # Extract text from content blocks (e.g., Claude responses with thinking) + text_parts = [] + claude_tool_uses = [] + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif block.get("type") == "thinking": + # Include thinking in output (marked as such) + thinking_text = block.get("thinking", "") + if thinking_text: + text_parts.append(f"\n{thinking_text}\n") + elif block.get("type") == "tool_use": + # Store tool_use blocks for later processing + claude_tool_uses.append(block) + content = "\n".join(text_parts) if text_parts else "" + + # Add Claude tool_uses to regular tool_calls + if claude_tool_uses and not tool_calls: + tool_calls = [] + for tu in claude_tool_uses: + tool_calls.append({ + "id": tu.get("id"), + "function": { + "name": tu.get("name"), + "arguments": json.dumps(tu.get("input", {})) + } + }) + + # 1) First add assistant's text content (if present) + if content: + sdk_format.append({ + "id": "__fake_id__", + "content": [ + { + "annotations": [], + "text": content if content else "", + "type": "output_text" + } + ], + "role": "assistant", + "status": "completed", + "type": "message" + }) + + # 2) Then add (new format) tool_calls + if tool_calls: + for tool_call in tool_calls: + call_id = tool_call.get("id", f"call_{uuid.uuid4().hex}") + func_name = tool_call.get("function", {}).get("name", "") + sdk_format.append({ + "arguments": tool_call.get("function", {}).get("arguments", "{}"), + "call_id": call_id, + "name": func_name, + "type": "function_call", + "id": "__fake_id__" + }) + + # 3) Finally handle (legacy format) function_call + if function_call: + func_name = function_call.get("name", "") + call_id = f"call_{uuid.uuid4().hex}" + function_call_map[func_name] = call_id # Store for matching responses + sdk_format.append({ + "arguments": function_call.get("arguments", "{}"), + "call_id": call_id, + "name": func_name, + "type": "function_call", + "id": "__fake_id__" + }) + + # 4) If neither content nor any calls exist, maintain fallback behavior + if not content and not tool_calls and not function_call: + sdk_format.append({ + "id": "__fake_id__", + "content": [ + { + "annotations": [], + "text": "", + "type": "output_text" + } + ], + "role": "assistant", + "status": "completed", + "type": "message" + }) + # === CHANGED ORDER END === + + elif role == "tool": + # Tool responses + sdk_format.append({ + "call_id": msg.get("tool_call_id", ""), + "output": json.dumps({ + "type": "text", + "text": msg.get("content", ""), + "annotations": None, + "meta": None + }), + "type": "function_call_output" + }) + + elif role == "function": + # Legacy function responses - try to match with stored call ID + func_name = msg.get("name", "") + call_id = function_call_map.get(func_name, f"call_{uuid.uuid4().hex}") + sdk_format.append({ + "call_id": call_id, + "output": json.dumps({ + "type": "text", + "text": msg.get("content", ""), + "annotations": None, + "meta": None + }), + "type": "function_call_output" + }) + + return sdk_format + + + + def _convert_to_anthropic_format(self, tools: List[Dict]) -> List[Dict]: + """Convert MCP tool definitions to Anthropic format.""" + anthropic_tools = [] + + for tool in tools: + anthropic_tool = { + "name": tool.get("name"), + "description": tool.get("description", ""), + "input_schema": tool.get("inputSchema", { + "type": "object", + "properties": {}, + "required": [] + }) + } + anthropic_tools.append(anthropic_tool) + + return anthropic_tools + + def _is_gemini_model(self) -> bool: + """Check if the model is a Gemini model.""" + model_lower = self.litellm_input_model_name.lower() + return "gemini" in model_lower or "bison" in model_lower + + def _simplify_schema_for_gemini(self, schema: Dict) -> Dict: + """ + Simplify nested schemas for Gemini compatibility. + Gemini has issues with deeply nested array type definitions. + + Note: This is a compatibility layer for Gemini API via LiteLLM. + Can be removed once LiteLLM handles this internally. + """ + if not isinstance(schema, dict): + return schema + + simplified = {} + + for key, value in schema.items(): + if key == "type" and isinstance(value, list): + # Gemini doesn't like type as array, use first type + simplified[key] = value[0] if value else "string" + elif key == "items" and isinstance(value, dict): + # Recursively simplify items + simplified[key] = self._simplify_schema_for_gemini(value) + elif key == "properties" and isinstance(value, dict): + # Recursively simplify each property + simplified[key] = { + prop_key: self._simplify_schema_for_gemini(prop_val) + for prop_key, prop_val in value.items() + } + elif isinstance(value, dict): + # Recursively simplify nested objects + simplified[key] = self._simplify_schema_for_gemini(value) + elif isinstance(value, list) and key not in ["required", "enum"]: + # For non-special arrays, check if they contain schemas + simplified[key] = [ + self._simplify_schema_for_gemini(item) if isinstance(item, dict) else item + for item in value + ] + else: + simplified[key] = value + + return simplified + + + def _convert_to_openai_format(self, tools: List[Dict]) -> List[Dict]: + """ + Convert MCP tool definitions to OpenAI function format. + + For Gemini models, applies schema simplification to handle + compatibility issues with deeply nested array type definitions. + """ + functions = [] + is_gemini = self._is_gemini_model() + + if is_gemini: + logger.debug(f"Detected Gemini model: {self.litellm_input_model_name}") + logger.debug(f"Processing {len(tools)} tools for Gemini compatibility") + + for i, tool in enumerate(tools): + # Get the input schema + input_schema = tool.get("inputSchema", { + "type": "object", + "properties": {}, + "required": [] + }) + + # Simplify schema for Gemini if needed + if is_gemini: + original_schema = input_schema.copy() # Keep for debugging + input_schema = self._simplify_schema_for_gemini(input_schema) + + # Log significant changes for debugging + if input_schema != original_schema: + logger.debug(f"Simplified schema for tool #{i} '{tool.get('name')}'") + + function = { + "name": tool.get("name"), + "description": tool.get("description", ""), + "parameters": input_schema + } + functions.append(function) + + if is_gemini: + logger.info(f"Converted {len(functions)} tools for Gemini model with schema simplification") + + return functions + + + + + # ==================== MCP Server Management ==================== + + async def _create_mcp_server(self) -> Any: + """Create and return an MCP server instance.""" + if self.mcp_service in self.STDIO_SERVICES: + return self._create_stdio_server() + elif self.mcp_service in self.HTTP_SERVICES: + return self._create_http_server() + else: + raise ValueError(f"Unsupported MCP service: {self.mcp_service}") + + + def _create_stdio_server(self) -> MCPStdioServer: + """Create stdio-based MCP server.""" + if self.mcp_service == "notion": + notion_key = self.service_config.get("notion_key") + if not notion_key: + raise ValueError("Notion API key required") + + return MCPStdioServer( + command="npx", + args=["-y", "@notionhq/notion-mcp-server"], + env={ + "OPENAPI_MCP_HEADERS": ( + '{"Authorization": "Bearer ' + notion_key + '", ' + '"Notion-Version": "2022-06-28"}' + ) + } + ) + + elif self.mcp_service == "filesystem": + test_directory = self.service_config.get("test_directory") + if not test_directory: + raise ValueError("Test directory required for filesystem service") + + return MCPStdioServer( + command="npx", + args=["-y", "@modelcontextprotocol/server-filesystem", str(test_directory)] + ) + + elif self.mcp_service in ["playwright", "playwright_webarena"]: + browser = self.service_config.get("browser", "chromium") + headless = self.service_config.get("headless", True) + viewport_width = self.service_config.get("viewport_width", 1280) + viewport_height = self.service_config.get("viewport_height", 720) + + args = ["-y", "@playwright/mcp@latest"] + if headless: + args.append("--headless") + args.extend([ + "--isolated", + "--no-sandbox", + "--browser", browser, + "--viewport-size", f"{viewport_width},{viewport_height}" + ]) + + return MCPStdioServer(command="npx", args=args) + + elif self.mcp_service == "postgres": + host = self.service_config.get("host", "localhost") + port = self.service_config.get("port", 5432) + username = self.service_config.get("username") + password = self.service_config.get("password") + database = self.service_config.get("current_database") or self.service_config.get("database") + + if not all([username, password, database]): + raise ValueError("PostgreSQL requires username, password, and database") + + database_url = f"postgresql://{username}:{password}@{host}:{port}/{database}" + + return MCPStdioServer( + command="pipx", + args=["run", "postgres-mcp", "--access-mode=unrestricted"], + env={"DATABASE_URI": database_url} + ) + + else: + raise ValueError(f"Unsupported stdio service: {self.mcp_service}") + + + def _create_http_server(self) -> MCPHttpServer: + """Create HTTP-based MCP server.""" + if self.mcp_service == "github": + github_token = self.service_config.get("github_token") + if not github_token: + raise ValueError("GitHub token required") + + return MCPHttpServer( + url="https://api.githubcopilot.com/mcp/", + headers={ + "Authorization": f"Bearer {github_token}", + "User-Agent": "MCPMark/1.0" + } + ) + else: + raise ValueError(f"Unsupported HTTP service: {self.mcp_service}") + diff --git a/src/agents/utils/__init__.py b/src/agents/utils/__init__.py new file mode 100644 index 0000000..f5b2242 --- /dev/null +++ b/src/agents/utils/__init__.py @@ -0,0 +1,8 @@ +""" +Utility functions for MCPMark Agent +==================================== +""" + +from .token_usage import TokenUsageTracker + +__all__ = ["TokenUsageTracker"] \ No newline at end of file diff --git a/src/agents/utils/token_usage.py b/src/agents/utils/token_usage.py new file mode 100644 index 0000000..17ac0f4 --- /dev/null +++ b/src/agents/utils/token_usage.py @@ -0,0 +1,78 @@ +""" +Token Usage Tracking Utilities +=============================== +""" + +from typing import Dict, Any + + +class TokenUsageTracker: + """Track token usage across agent executions.""" + + def __init__(self): + """Initialize token usage tracker.""" + self.reset() + + def reset(self): + """Reset all usage statistics.""" + self._stats = { + "total_input_tokens": 0, + "total_output_tokens": 0, + "total_tokens": 0, + "total_turns": 0, + "total_execution_time": 0.0, + "successful_executions": 0, + "failed_executions": 0, + } + + def update(self, success: bool, token_usage: Dict[str, int], + turn_count: int, execution_time: float): + """ + Update usage statistics. + + Args: + success: Whether execution was successful + token_usage: Token usage dict with input_tokens, output_tokens, total_tokens + turn_count: Number of conversation turns + execution_time: Execution time in seconds + """ + if success: + self._stats["successful_executions"] += 1 + else: + self._stats["failed_executions"] += 1 + + self._stats["total_input_tokens"] += token_usage.get("input_tokens", 0) + self._stats["total_output_tokens"] += token_usage.get("output_tokens", 0) + self._stats["total_tokens"] += token_usage.get("total_tokens", 0) + self._stats["total_turns"] += turn_count + self._stats["total_execution_time"] += execution_time + + def get_stats(self) -> Dict[str, Any]: + """ + Get usage statistics with calculated averages. + + Returns: + Dictionary containing usage statistics + """ + stats = self._stats.copy() + + # Calculate averages + total_executions = stats["successful_executions"] + stats["failed_executions"] + if total_executions > 0: + stats["avg_input_tokens"] = stats["total_input_tokens"] / total_executions + stats["avg_output_tokens"] = stats["total_output_tokens"] / total_executions + stats["avg_total_tokens"] = stats["total_tokens"] / total_executions + stats["avg_turns"] = stats["total_turns"] / total_executions + stats["avg_execution_time"] = stats["total_execution_time"] / total_executions + stats["success_rate"] = (stats["successful_executions"] / total_executions * 100) + else: + stats.update({ + "avg_input_tokens": 0.0, + "avg_output_tokens": 0.0, + "avg_total_tokens": 0.0, + "avg_turns": 0.0, + "avg_execution_time": 0.0, + "success_rate": 0.0, + }) + + return stats \ No newline at end of file diff --git a/src/aggregators/aggregate_results.py b/src/aggregators/aggregate_results.py index c2e8344..911984c 100644 --- a/src/aggregators/aggregate_results.py +++ b/src/aggregators/aggregate_results.py @@ -54,6 +54,101 @@ def discover_service_model_dirs(base_dir: Path) -> List[Path]: return [d for d in base_dir.iterdir() if d.is_dir() and "__" in d.name] +def discover_service_model_dirs_in_exp(exp_dir: Path) -> List[Path]: + """Discover service_model directories at experiment root (new layout).""" + return discover_service_model_dirs(exp_dir) + + +def discover_run_names(exp_dir: Path) -> List[str]: + """Discover run names (run-*) under service_model directories (new layout).""" + sm_dirs = discover_service_model_dirs_in_exp(exp_dir) + run_names_set = set() + for sm in sm_dirs: + if not sm.is_dir(): + continue + for d in sm.iterdir(): + if d.is_dir() and d.name.startswith("run-"): + run_names_set.add(d.name) + return sorted(run_names_set) + + +def detect_layout_and_runs(exp_dir: Path) -> tuple[str, List[str]]: + """Detect directory layout and available run names. + + Returns (layout, run_names): + - layout: 'root_runs' | 'nested_runs' | 'single' + - run_names: list like ['run-1', 'run-2', ...] (for single, defaults to ['run-1']) + """ + # Old layout: run-* at root + root_runs = discover_run_directories(exp_dir) + if root_runs: + return "root_runs", [d.name for d in root_runs] + + # New layout: service_model at root, with run-* inside + sm_dirs = discover_service_model_dirs_in_exp(exp_dir) + run_names_set = set() + for sm in sm_dirs: + if not sm.is_dir(): + continue + for d in sm.iterdir(): + if d.is_dir() and d.name.startswith("run-"): + run_names_set.add(d.name) + + if run_names_set: + return "nested_runs", sorted(run_names_set) + + # Fallback single + return "single", ["run-1"] + + +def collect_task_results_from_nested( + exp_dir: Path, run_name: str, force: bool = False +) -> Dict[str, Dict[str, Any]]: + """Collect task results for the new layout: service_model at root and run-* inside each.""" + results = {} + for service_model_dir in discover_service_model_dirs_in_exp(exp_dir): + run_dir = service_model_dir / run_name + if not run_dir.exists() or not run_dir.is_dir(): + continue + + service_model = service_model_dir.name + for task_dir in run_dir.iterdir(): + if not task_dir.is_dir(): + continue + if "__" not in task_dir.name: + continue + meta_path = task_dir / "meta.json" + if not meta_path.exists(): + continue + try: + with open(meta_path, "r", encoding="utf-8") as f: + meta = json.load(f) + + # Skip results with pipeline errors unless force=True + if not force and has_pipeline_errors(meta): + continue + + # Use directory name as task_name (category_id__task_id format) + task_name = task_dir.name + task_key = f"{service_model}__{task_name}" + results[task_key] = { + "success": meta.get("execution_result", {}).get("success", False), + "error_message": meta.get("execution_result", {}).get( + "error_message" + ), + "agent_execution_time": meta.get("agent_execution_time", 0), + "task_execution_time": meta.get("task_execution_time", 0), + "token_usage": meta.get("token_usage", {}), + "turn_count": meta.get("turn_count", 0), + "model_name_name": meta.get("model_name_name"), + "meta": meta, # Keep full meta for model_results + } + except Exception as e: + print(f"⚠️ Error reading {meta_path}: {e}") + continue + + return results + def has_pipeline_errors(meta: Dict[str, Any]) -> bool: """Check if a task result contains pipeline errors.""" error_msg = meta.get("execution_result", {}).get("error_message", "") @@ -127,7 +222,7 @@ def collect_task_results_from_run( "task_execution_time": meta.get("task_execution_time", 0), "token_usage": meta.get("token_usage", {}), "turn_count": meta.get("turn_count", 0), - "actual_model_name": meta.get("actual_model_name"), + "model_name_name": meta.get("model_name_name"), "meta": meta, # Keep full meta for model_results } except Exception as e: @@ -164,13 +259,13 @@ def calculate_k_run_metrics( "total_output_tokens": 0, "total_tokens": 0, "total_turns": 0, - "actual_model_name": None, + "model_name_name": None, } ) # Process each task for task_key in all_task_keys: - # Extract service__model from service__model__category__task format + # Extract model__service from model__service__category__task format parts = task_key.split("__") if len(parts) >= 2: service_model = f"{parts[0]}__{parts[1]}" @@ -240,14 +335,14 @@ def calculate_k_run_metrics( service_model_metrics[service_model]["total_tokens"] += sum(task_total_tokens) service_model_metrics[service_model]["total_turns"] += sum(task_turns) - # Store actual_model_name from first available task result - if service_model_metrics[service_model]["actual_model_name"] is None: + # Store model_name_name from first available task result + if service_model_metrics[service_model]["model_name_name"] is None: for run_name in runs_to_process: if run_name in all_runs_results: run_results = all_runs_results[run_name] task_result = run_results.get(task_key) - if task_result and task_result.get("actual_model_name"): - service_model_metrics[service_model]["actual_model_name"] = task_result["actual_model_name"] + if task_result and task_result.get("model_name_name"): + service_model_metrics[service_model]["model_name_name"] = task_result["model_name_name"] break # pass@1: Will be calculated as avg@k later (skip individual task counting) @@ -358,12 +453,20 @@ def calculate_k_run_metrics( def aggregate_single_run_results( exp_dir: Path, force: bool = False ) -> Dict[str, Dict[str, Any]]: - """Aggregate results for single-run experiment.""" + """Aggregate results for single-run experiment. + + New layout only: results/{exp}/service__model/run-1/{category__task}/... + """ service_model_results = {} - for service_model_dir in discover_service_model_dirs(exp_dir): + for service_model_dir in discover_service_model_dirs_in_exp(exp_dir): service_model = service_model_dir.name + # Determine task root directory: run-1 in new layout + task_root = service_model_dir / "run-1" + if not task_root.exists() or not task_root.is_dir(): + continue + # Collect task results total_tasks = 0 successful_tasks = 0 @@ -372,9 +475,9 @@ def aggregate_single_run_results( total_output_tokens = 0 total_tokens = 0 total_turns = 0 - actual_model_name = None + model_name_name = None - for task_dir in service_model_dir.iterdir(): + for task_dir in task_root.iterdir(): if not task_dir.is_dir(): continue @@ -403,9 +506,9 @@ def aggregate_single_run_results( total_tokens += token_usage.get("total_tokens", 0) or 0 total_turns += meta.get("turn_count", 0) or 0 - # Store actual_model_name from first available meta - if actual_model_name is None and meta.get("actual_model_name"): - actual_model_name = meta.get("actual_model_name") + # Store model_name_name from first available meta + if model_name_name is None and meta.get("model_name_name"): + model_name_name = meta.get("model_name_name") except Exception as e: print(f"⚠️ Error reading {meta_path}: {e}") @@ -428,7 +531,7 @@ def aggregate_single_run_results( "avg_output_tokens": round(total_output_tokens / total_tasks, 4), "avg_total_tokens": round(total_tokens / total_tasks, 4), "avg_turns": round(total_turns / total_tasks, 4), - "actual_model_name": actual_model_name, + "model_name_name": model_name_name, } return service_model_results @@ -456,7 +559,7 @@ def create_simplified_summary( for service_model in service_model_results.keys(): if "__" in service_model: - service, model = service_model.split("__", 1) + model, service = service_model.split("__", 1) all_services.add(service) all_models.add(model) @@ -475,7 +578,7 @@ def create_simplified_summary( model_service_data = [] for service_model, metrics in service_model_results.items(): - if "__" in service_model and service_model.split("__", 1)[1] == model: + if "__" in service_model and service_model.split("__", 1)[0] == model: model_metrics["total_tasks"] += metrics["total_tasks"] model_metrics["total_agent_execution_time"] += metrics.get( "total_agent_execution_time", 0 @@ -529,11 +632,11 @@ def create_simplified_summary( model_metrics["per_run_output_tokens"] = per_run_output_tokens model_metrics["per_run_cost"] = per_run_cost - # Set actual_model_name from first available service data + # Set model_name_name from first available service data for service_model, metrics in service_model_results.items(): - if "__" in service_model and service_model.split("__", 1)[1] == model: - if metrics.get("actual_model_name"): - model_metrics["actual_model_name"] = metrics["actual_model_name"] + if "__" in service_model and service_model.split("__", 1)[0] == model: + if metrics.get("model_name_name"): + model_metrics["model_name_name"] = metrics["model_name_name"] break if k > 1: @@ -697,8 +800,8 @@ def create_simplified_summary( "per_run_cost": per_run_cost, } - # Add actual_model_name to service-level metrics - formatted_metrics["actual_model_name"] = metrics.get("actual_model_name") + # Add model_name_name to service-level metrics + formatted_metrics["model_name_name"] = metrics.get("model_name_name") summary[service][model] = formatted_metrics return summary @@ -711,12 +814,10 @@ def generate_model_results( if single_run_models is None: single_run_models = [] - if k > 1: - model_results_dir = exp_dir / "model_results" - run_dirs = discover_run_directories(exp_dir) - else: - model_results_dir = exp_dir / "model_results" - run_dirs = [exp_dir] + model_results_dir = exp_dir / "model_results" + run_names = discover_run_names(exp_dir) + if not run_names: + run_names = ["run-1"] # Remove existing model_results if it exists if model_results_dir.exists(): @@ -727,23 +828,25 @@ def generate_model_results( # Collect all task data organized by model model_task_data = defaultdict(dict) - for run_idx, run_dir in enumerate(run_dirs, 1): - run_name = f"run-{run_idx}" if k > 1 else "run-1" - - for service_model_dir in discover_service_model_dirs(run_dir): + for run_name in run_names: + for service_model_dir in discover_service_model_dirs_in_exp(exp_dir): service_model = service_model_dir.name if "__" not in service_model: continue - service, model = service_model.split("__", 1) + model, service = service_model.split("__", 1) # For single-run models, only process run-1 is_single_run_model = any(m in model for m in single_run_models) if is_single_run_model and run_name != "run-1": continue - for task_dir in service_model_dir.iterdir(): + run_dir = service_model_dir / run_name + if not run_dir.exists() or not run_dir.is_dir(): + continue + + for task_dir in run_dir.iterdir(): if not task_dir.is_dir(): continue @@ -811,12 +914,10 @@ def generate_task_results( if single_run_models is None: single_run_models = [] - if k > 1: - task_results_dir = exp_dir / "task_results" - run_dirs = discover_run_directories(exp_dir) - else: - task_results_dir = exp_dir / "task_results" - run_dirs = [exp_dir] + task_results_dir = exp_dir / "task_results" + run_names = discover_run_names(exp_dir) + if not run_names: + run_names = ["run-1"] # Remove existing task_results if it exists if task_results_dir.exists(): @@ -833,23 +934,25 @@ def generate_task_results( } ) - for run_idx, run_dir in enumerate(run_dirs, 1): - run_name = f"run-{run_idx}" if k > 1 else "run-1" - - for service_model_dir in discover_service_model_dirs(run_dir): + for run_name in run_names: + for service_model_dir in discover_service_model_dirs_in_exp(exp_dir): service_model = service_model_dir.name if "__" not in service_model: continue - service, model = service_model.split("__", 1) + model, service = service_model.split("__", 1) # For single-run models, only process run-1 is_single_run_model = any(m in model for m in single_run_models) if is_single_run_model and run_name != "run-1": continue - for task_dir in service_model_dir.iterdir(): + run_dir = service_model_dir / run_name + if not run_dir.exists() or not run_dir.is_dir(): + continue + + for task_dir in run_dir.iterdir(): if not task_dir.is_dir(): continue @@ -1358,28 +1461,26 @@ def main(): if args.force: print("⚠️ Using --force: including incomplete/invalid results") - # Detect experiment type - run_dirs = discover_run_directories(exp_dir) - k = len(run_dirs) if run_dirs else 1 - - if k > 1: - print(f"📊 Detected {k}-run experiment structure") + # Only support new layout (runs nested under service_model). If no run-*, treat as single run. + run_names = discover_run_names(exp_dir) + if run_names: + k = len(run_names) + print(f"📊 Detected {k}-run experiment structure (runs nested under service_model)") - # Collect results from all runs + # Collect results from all nested runs all_runs_results = {} - for run_dir in run_dirs: - run_name = run_dir.name + for run_name in run_names: print(f" Processing {run_name}...") - run_results = collect_task_results_from_run(run_dir, args.force) + run_results = collect_task_results_from_nested(exp_dir, run_name, args.force) all_runs_results[run_name] = run_results # Calculate k-run metrics service_model_metrics = calculate_k_run_metrics( all_runs_results, k, single_run_models ) - else: - print("📊 Detected single-run experiment") + print("📊 Detected single-run experiment (new layout expected under run-1)") + k = 1 service_model_metrics = aggregate_single_run_results(exp_dir, args.force) if not service_model_metrics: @@ -1393,10 +1494,7 @@ def main(): ) # Save summary.json - if k > 1: - summary_path = exp_dir / "summary.json" - else: - summary_path = exp_dir / "summary.json" + summary_path = exp_dir / "summary.json" with open(summary_path, "w", encoding="utf-8") as f: json.dump(summary, f, indent=2, ensure_ascii=False) diff --git a/src/base/task_manager.py b/src/base/task_manager.py index 7e1efb6..6592300 100644 --- a/src/base/task_manager.py +++ b/src/base/task_manager.py @@ -214,12 +214,9 @@ def execute_task(self, task: BaseTask, agent_result: Dict[str, Any]) -> TaskResu if verification_error: logger.error(f"| Verification Error: {verification_error}") - # Overall task success: BOTH agent AND verification must succeed - overall_success = agent_success and verification_success - return TaskResult( task_name=task.name, - success=overall_success, + success=verification_success, error_message=agent_error, # Agent execution error verification_error=verification_error, # Verification error verification_output=verification_output, # Verification output diff --git a/src/errors.py b/src/errors.py index ca2742a..36eb8c3 100644 --- a/src/errors.py +++ b/src/errors.py @@ -9,23 +9,26 @@ from typing import Optional -# Retryable error patterns +"""Retryable error detection via minimal substring matching (lower-case).""" + +# Keep this list short and generic; aim to catch API/infrastructure issues only. RETRYABLE_PATTERNS = { - "timeout", - "timed out", - "etimedout", - "econnrefused", - "connection refused", - "network error", + "ratelimit", # e.g., RateLimitError, too many requests + "timeout", # any timeout wording + "connection", # connection refused/reset/error + "unavailable", # service unavailable + "internal server error", # 500s + "network error", # generic network issue + "quota", # budget/quota exceeded + # pipeline infra signals "mcp network error", "state duplication error", - "already exists", } def is_retryable_error(error: str) -> bool: - """Check if an error message indicates it should be retried.""" - error_lower = str(error).lower() + """Return True if the error string contains any retryable pattern.""" + error_lower = str(error or "").lower() return any(pattern in error_lower for pattern in RETRYABLE_PATTERNS) @@ -57,8 +60,3 @@ def standardize_error_message(error: str, mcp_service: Optional[str] = None) -> return f"{mcp_service.title()} {base_msg}" return base_msg - - -def get_retry_delay(attempt: int, base_delay: int = 5) -> int: - """Get exponential backoff delay for retries.""" - return min(base_delay * (2 ** (attempt - 1)), 60) # Cap at 60 seconds diff --git a/src/evaluator.py b/src/evaluator.py index 819ad7f..d4e164d 100644 --- a/src/evaluator.py +++ b/src/evaluator.py @@ -10,12 +10,8 @@ from src.factory import MCPServiceFactory from src.model_config import ModelConfig from src.results_reporter import EvaluationReport, ResultsReporter, TaskResult -from src.agent import MCPAgent - -PIPELINE_RETRY_ERRORS: List[str] = [ - "State Duplication Error", - "MCP Network Error", -] +from src.errors import is_retryable_error +from src.agents import MCPMarkAgent # Initialize logger logger = get_logger(__name__) @@ -29,18 +25,23 @@ def __init__( timeout: int = 300, exp_name: str = "test-run", output_dir: Path = None, - stream: bool = False, + reasoning_effort: str = "default", ): # Main configuration self.mcp_service = mcp_service - self.model = model self.timeout = timeout - + # Initialize model configuration - model_config = ModelConfig(model) - self.actual_model_name = model_config.actual_model_name - self.base_url = model_config.base_url + self.reasoning_effort = reasoning_effort + self.model_name = model + + model_config = ModelConfig(self.model_name) self.api_key = model_config.api_key + self.base_url = model_config.base_url + self.litellm_input_model_name = model_config.litellm_input_model_name + + # Track the actual model name from LiteLLM responses + self.litellm_run_model_name = None # Initialize managers using the factory pattern (simplified) self.task_manager = MCPServiceFactory.create_task_manager(mcp_service) @@ -53,23 +54,28 @@ def __init__( # automatically refresh its service configuration from the state # manager before each execution, so per-task manual updates are no # longer needed. - self.agent = MCPAgent( - model_name=self.actual_model_name, + self.agent = MCPMarkAgent( + litellm_input_model_name=self.litellm_input_model_name, # Use the original model name for detection api_key=self.api_key, base_url=self.base_url, mcp_service=mcp_service, timeout=timeout, service_config=self.service_config, service_config_provider=self.state_manager.get_service_config_for_agent, - stream=stream, + reasoning_effort=self.reasoning_effort, ) # Initialize results reporter self.results_reporter = ResultsReporter() # Output directory handling - model_slug = self.model.replace(".", "-") - self.base_experiment_dir = output_dir / exp_name / f"{mcp_service}__{model_slug}" + if self.reasoning_effort != "default": + model_slug = self.model_name.replace(".", "-") + "-" + self.reasoning_effort + else: + model_slug = self.model_name.replace(".", "-") + + service_for_dir = "playwright" if mcp_service == "playwright_webarena" else mcp_service + self.base_experiment_dir = output_dir / f"{model_slug}__{service_for_dir}" / exp_name self.base_experiment_dir.mkdir(parents=True, exist_ok=True) def _format_duration(self, seconds: float) -> str: @@ -216,6 +222,10 @@ def _run_single_task(self, task) -> TaskResult: ) agent_execution_time = time.time() - agent_execution_start_time + + # Extract actual model name from LiteLLM response + if agent_result.get("litellm_run_model_name"): + self.litellm_run_model_name = agent_result["litellm_run_model_name"] # Write messages.json to task_output_dir messages_path = task_output_dir / "messages.json" @@ -284,7 +294,7 @@ def run_evaluation(self, task_filter: str) -> EvaluationReport: retry_due_to_error = ( existing_result is not None and not existing_result.success - and existing_result.error_message in PIPELINE_RETRY_ERRORS + and is_retryable_error(existing_result.error_message) ) if existing_result and not retry_due_to_error: @@ -334,7 +344,9 @@ def run_evaluation(self, task_filter: str) -> EvaluationReport: meta_path = task_output_dir / "meta.json" model_config = { "mcp_service": self.mcp_service, - "model_name": self.actual_model_name, + "model_name": self.model_name, + "litellm_run_model_name": self.litellm_run_model_name, + "reasoning_effort": self.reasoning_effort, "timeout": self.timeout, } self.results_reporter.save_meta_json( @@ -375,10 +387,12 @@ def _matches_filter(tr: TaskResult, flt: str) -> bool: final_results = list(merged.values()) aggregated_report = EvaluationReport( - model_name=self.model, + model_name=self.model_name, model_config={ "mcp_service": self.mcp_service, - "model_name": self.actual_model_name, + "model_name": self.model_name, + "litellm_run_model_name": self.litellm_run_model_name, + "reasoning_effort": self.reasoning_effort, "timeout": self.timeout, }, total_tasks=len(final_results), diff --git a/src/model_config.py b/src/model_config.py index b51b378..ebad32f 100644 --- a/src/model_config.py +++ b/src/model_config.py @@ -28,123 +28,135 @@ class ModelConfig: "gpt-4o": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-4o", + "litellm_input_model_name": "openai/gpt-4o", }, "gpt-4.1": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-4.1", + "litellm_input_model_name": "openai/gpt-4.1", }, "gpt-4.1-mini": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-4.1-mini", + "litellm_input_model_name": "openai/gpt-4.1-mini", + }, + "gpt-4.1-nano": { + "provider": "openai", + "api_key_var": "OPENAI_API_KEY", + "litellm_input_model_name": "openai/gpt-4.1-nano", }, "gpt-5": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-5", + "litellm_input_model_name": "openai/gpt-5", }, "gpt-5-mini": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-5-mini", + "litellm_input_model_name": "openai/gpt-5-mini", }, "gpt-5-nano": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "gpt-5-nano", + "litellm_input_model_name": "openai/gpt-5-nano", }, "o3": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "o3", + "litellm_input_model_name": "openai/o3", }, "o4-mini": { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": "o4-mini", + "litellm_input_model_name": "openai/o4-mini", + }, + "gpt-oss-120b": { + "provider": "openai", + "api_key_var": "OPENROUTER_API_KEY", + "litellm_input_model_name": "openrouter/openai/gpt-oss-120b", }, # DeepSeek models "deepseek-chat": { "provider": "deepseek", "api_key_var": "DEEPSEEK_API_KEY", - "base_url_var": "DEEPSEEK_BASE_URL", - "actual_model_name": "deepseek-chat", + "litellm_input_model_name": "deepseek/deepseek-chat", }, "deepseek-reasoner": { "provider": "deepseek", "api_key_var": "DEEPSEEK_API_KEY", - "base_url_var": "DEEPSEEK_BASE_URL", - "actual_model_name": "deepseek-reasoner", + "litellm_input_model_name": "deepseek/deepseek-reasoner", }, # Anthropic models - "claude-3-7-sonnet": { + "claude-3.7-sonnet": { "provider": "anthropic", "api_key_var": "ANTHROPIC_API_KEY", - "base_url_var": "ANTHROPIC_BASE_URL", - "actual_model_name": "claude-3-7-sonnet-20250219", + "litellm_input_model_name": "anthropic/claude-3-7-sonnet-20250219", }, - "claude-4-sonnet": { + "claude-sonnet-4": { "provider": "anthropic", "api_key_var": "ANTHROPIC_API_KEY", - "base_url_var": "ANTHROPIC_BASE_URL", - "actual_model_name": "claude-sonnet-4-20250514", + "litellm_input_model_name": "anthropic/claude-sonnet-4-20250514", }, - "claude-4-opus": { + "claude-opus-4": { "provider": "anthropic", "api_key_var": "ANTHROPIC_API_KEY", - "base_url_var": "ANTHROPIC_BASE_URL", - "actual_model_name": "claude-opus-4-20250514", + "litellm_input_model_name": "anthropic/claude-opus-4-20250514", }, - "claude-4.1-opus": { + "claude-opus-4.1": { "provider": "anthropic", "api_key_var": "ANTHROPIC_API_KEY", - "base_url_var": "ANTHROPIC_BASE_URL", - "actual_model_name": "claude-opus-4-1-20250805", + "litellm_input_model_name": "anthropic/claude-opus-4-1-20250805", }, # Google models "gemini-2.5-pro": { "provider": "google", "api_key_var": "GEMINI_API_KEY", - "base_url_var": "GEMINI_BASE_URL", - "actual_model_name": "gemini-2.5-pro", + "litellm_input_model_name": "gemini/gemini-2.5-pro", }, "gemini-2.5-flash": { "provider": "google", "api_key_var": "GEMINI_API_KEY", - "base_url_var": "GEMINI_BASE_URL", - "actual_model_name": "gemini-2.5-flash", + "litellm_input_model_name": "gemini/gemini-2.5-flash", }, # Moonshot models "k2": { "provider": "moonshot", "api_key_var": "MOONSHOT_API_KEY", - "base_url_var": "MOONSHOT_BASE_URL", - "actual_model_name": "kimi-k2-0711-preview", + "litellm_input_model_name": "moonshot/kimi-k2-0711-preview", + }, + "k2-turbo": { + "provider": "moonshot", + "api_key_var": "MOONSHOT_API_KEY", + "litellm_input_model_name": "moonshot/kimi-k2-turbo-preview", }, # Grok models "grok-4": { "provider": "xai", "api_key_var": "GROK_API_KEY", - "base_url_var": "GROK_BASE_URL", - "actual_model_name": "grok-4-0709", + "litellm_input_model_name": "xai/grok-4-0709", + }, + "grok-code-fast-1": { + "provider": "xai", + "api_key_var": "GROK_API_KEY", + "litellm_input_model_name": "xai/grok-code-fast-1", }, # Qwen models "qwen-3-coder": { "provider": "qwen", - "api_key_var": "QWEN_API_KEY", - "base_url_var": "QWEN_BASE_URL", - "actual_model_name": "qwen/qwen3-coder", + "api_key_var": "OPENROUTER_API_KEY", + "litellm_input_model_name": "openrouter/qwen/qwen3-coder", }, + "qwen-3-coder-plus": { + "provider": "qwen", + "api_key_var": "DASHSCOPE_API_KEY", + "litellm_input_model_name": "dashscope/qwen3-coder-plus", + }, + # Zhipu + "glm-4.5": { + "provider": "zhipu", + "api_key_var": "OPENROUTER_API_KEY", + "litellm_input_model_name": "openrouter/z-ai/glm-4.5", + } } def __init__(self, model_name: str): @@ -157,25 +169,22 @@ def __init__(self, model_name: str): Raises: ValueError: If the model is not supported or environment variables are missing. """ - self.model_name = model_name + self.short_model_name = model_name model_info = self._get_model_info(model_name) - # Load API key and base URL from environment variables + # Load API key, base URL and LiteLLM model name from environment variables + if "base_url_var" in model_info: + self.base_url = os.getenv(model_info["base_url_var"]) + else: + self.base_url = None + self.api_key = os.getenv(model_info["api_key_var"]) if not self.api_key: raise ValueError( f"Missing required environment variable: {model_info['api_key_var']}" ) - self.base_url = os.getenv(model_info["base_url_var"]) - if not self.base_url: - raise ValueError( - f"Missing required environment variable: {model_info['base_url_var']}" - ) - - # Store provider and the actual model name for the API - self.provider = model_info["provider"] - self.actual_model_name = model_info.get("actual_model_name", model_name) + self.litellm_input_model_name = model_info.get("litellm_input_model_name", model_name) def _get_model_info(self, model_name: str) -> Dict[str, str]: """ @@ -190,8 +199,7 @@ def _get_model_info(self, model_name: str) -> Dict[str, str]: return { "provider": "openai", "api_key_var": "OPENAI_API_KEY", - "base_url_var": "OPENAI_BASE_URL", - "actual_model_name": model_name, + "litellm_input_model_name": model_name, } return self.MODEL_CONFIGS[model_name] @@ -209,10 +217,8 @@ def main(): # Example: Create a model config for DeepSeek model_config = ModelConfig("deepseek-chat") logger.info("✅ DeepSeek model config created successfully.") - logger.info("Provider: %s", model_config.provider) - logger.info("Actual model name: %s", model_config.actual_model_name) + logger.info("Short model name: %s", model_config.short_model_name) logger.info("API key loaded: %s", bool(model_config.api_key)) - logger.info("Base URL: %s", model_config.base_url) except ValueError as e: logger.error("⚠️ Configuration error: %s", e) diff --git a/src/results_reporter.py b/src/results_reporter.py index 40ec5da..3d8fa55 100644 --- a/src/results_reporter.py +++ b/src/results_reporter.py @@ -82,7 +82,7 @@ def total_input_tokens(self) -> int: total = 0 for result in self.task_results: if result.token_usage: - total += result.token_usage.get("input_tokens", 0) + total += (result.token_usage.get("input_tokens") or 0) return total @property @@ -91,7 +91,7 @@ def total_output_tokens(self) -> int: total = 0 for result in self.task_results: if result.token_usage: - total += result.token_usage.get("output_tokens", 0) + total += (result.token_usage.get("output_tokens") or 0) return total @property @@ -100,7 +100,16 @@ def total_tokens(self) -> int: total = 0 for result in self.task_results: if result.token_usage: - total += result.token_usage.get("total_tokens", 0) + total += (result.token_usage.get("total_tokens") or 0) + return total + + @property + def total_reasoning_tokens(self) -> int: + """Calculate total reasoning tokens across all tasks.""" + total = 0 + for result in self.task_results: + if result.token_usage: + total += (result.token_usage.get("reasoning_tokens") or 0) return total @property @@ -123,6 +132,13 @@ def avg_total_tokens(self) -> float: if self.total_tasks == 0: return 0.0 return self.total_tokens / self.total_tasks + + @property + def avg_reasoning_tokens(self) -> float: + """Calculate average reasoning tokens per task.""" + if self.total_tasks == 0: + return 0.0 + return self.total_reasoning_tokens / self.total_tasks @property def total_task_execution_time(self) -> float: @@ -155,9 +171,11 @@ def get_category_stats(self) -> Dict[str, Dict[str, Any]]: "total_input_tokens": 0, "total_output_tokens": 0, "total_tokens": 0, + "total_reasoning_tokens": 0, "avg_input_tokens": 0.0, "avg_output_tokens": 0.0, "avg_total_tokens": 0.0, + "avg_reasoning_tokens": 0.0, "total_turns": 0, "avg_turns": 0.0, } @@ -171,14 +189,17 @@ def get_category_stats(self) -> Dict[str, Dict[str, Any]]: # Add token and turn usage if result.token_usage: category_stats[category]["total_input_tokens"] += ( - result.token_usage.get("input_tokens", 0) + result.token_usage.get("input_tokens") or 0 ) category_stats[category]["total_output_tokens"] += ( - result.token_usage.get("output_tokens", 0) + result.token_usage.get("output_tokens") or 0 ) - category_stats[category]["total_tokens"] += result.token_usage.get( - "total_tokens", 0 + category_stats[category]["total_tokens"] += ( + result.token_usage.get("total_tokens") or 0 ) + category_stats[category]["total_reasoning_tokens"] += result.token_usage.get( + "reasoning_tokens", 0 + ) or 0 # Accumulate turns if result.turn_count is not None: @@ -206,6 +227,7 @@ def get_category_stats(self) -> Dict[str, Dict[str, Any]]: stats["total_output_tokens"] / stats["total"] ) stats["avg_total_tokens"] = stats["total_tokens"] / stats["total"] + stats["avg_reasoning_tokens"] = stats["total_reasoning_tokens"] / stats["total"] stats["avg_turns"] = ( stats["total_turns"] / stats["total"] if stats["total"] > 0 else 0 @@ -241,7 +263,9 @@ def save_meta_json( meta_data = { "task_name": task_result.task_name, - "model": model_config.get("model_name", "unknown"), + "model_name": model_config.get("model_name", "unknown"), + "litellm_run_model_name": model_config.get("litellm_run_model_name"), + "reasoning_effort": model_config.get("reasoning_effort"), "mcp": model_config.get("mcp_service", "unknown"), "timeout": model_config.get("timeout", 300), "time": {"start": start_time.isoformat(), "end": end_time.isoformat()}, @@ -272,7 +296,8 @@ def save_model_summary(self, report: EvaluationReport, output_path: Path) -> Pat avg_turns = total_turns / report.total_tasks if report.total_tasks > 0 else 0 summary = { - "model": report.model_name, + "model_name": report.model_name, + "model_config": report.model_config, "total_tasks": report.total_tasks, "successful_tasks": report.successful_tasks, "failed_tasks": report.failed_tasks, @@ -289,9 +314,11 @@ def save_model_summary(self, report: EvaluationReport, output_path: Path) -> Pat "total_input_tokens": report.total_input_tokens, "total_output_tokens": report.total_output_tokens, "total_tokens": report.total_tokens, + "total_reasoning_tokens": report.total_reasoning_tokens, "avg_input_tokens": round(report.avg_input_tokens, 2), "avg_output_tokens": round(report.avg_output_tokens, 2), "avg_total_tokens": round(report.avg_total_tokens, 2), + "avg_reasoning_tokens": round(report.avg_reasoning_tokens, 2), }, "turn_usage": { "total_turns": total_turns, @@ -306,9 +333,11 @@ def save_model_summary(self, report: EvaluationReport, output_path: Path) -> Pat "total_input": stats["total_input_tokens"], "total_output": stats["total_output_tokens"], "total": stats["total_tokens"], + "total_reasoning": stats["total_reasoning_tokens"], "avg_input": round(stats["avg_input_tokens"], 2), "avg_output": round(stats["avg_output_tokens"], 2), "avg_total": round(stats["avg_total_tokens"], 2), + "avg_reasoning": round(stats["avg_reasoning_tokens"], 2), }, "turn_usage": { "total_turns": stats["total_turns"], diff --git a/tasks/github/mcpmark-cicd/deployment_status_workflow/verify.py b/tasks/github/mcpmark-cicd/deployment_status_workflow/verify.py index e14782b..02b9d39 100644 --- a/tasks/github/mcpmark-cicd/deployment_status_workflow/verify.py +++ b/tasks/github/mcpmark-cicd/deployment_status_workflow/verify.py @@ -44,7 +44,7 @@ def _search_github_issues( def _wait_for_workflow_completion( - headers: Dict[str, str], owner: str, repo: str, max_wait: int = 300 + headers: Dict[str, str], owner: str, repo: str, max_wait: int = 90 ) -> bool: """Wait for GitHub Actions workflows to complete processing.""" print("⏳ Waiting for deployment status workflows to complete...") diff --git a/tasks/github/mcpmark-cicd/issue_management_workflow/verify.py b/tasks/github/mcpmark-cicd/issue_management_workflow/verify.py index dbae06f..fa0a1dc 100644 --- a/tasks/github/mcpmark-cicd/issue_management_workflow/verify.py +++ b/tasks/github/mcpmark-cicd/issue_management_workflow/verify.py @@ -44,7 +44,7 @@ def _search_github_issues( def _wait_for_workflow_completion( - headers: Dict[str, str], owner: str, repo: str, max_wait: int = 180 + headers: Dict[str, str], owner: str, repo: str, max_wait: int = 90 ) -> bool: """Wait for GitHub Actions workflows to complete processing.""" print("⏳ Waiting for GitHub Actions workflows to complete...") diff --git a/tasks/github/mcpmark-cicd/linting_ci_workflow/verify.py b/tasks/github/mcpmark-cicd/linting_ci_workflow/verify.py index 2d4ec1a..6a0fcc5 100644 --- a/tasks/github/mcpmark-cicd/linting_ci_workflow/verify.py +++ b/tasks/github/mcpmark-cicd/linting_ci_workflow/verify.py @@ -286,7 +286,7 @@ def verify() -> bool: second_commit_runs = [] start_time = time.time() - timeout = 120 + timeout = 90 no_workflow_check_count = 0 while time.time() - start_time < timeout: diff --git a/tasks/github/mcpmark-cicd/pr_automation_workflow/verify.py b/tasks/github/mcpmark-cicd/pr_automation_workflow/verify.py index d5c9b71..7c6c24a 100644 --- a/tasks/github/mcpmark-cicd/pr_automation_workflow/verify.py +++ b/tasks/github/mcpmark-cicd/pr_automation_workflow/verify.py @@ -108,7 +108,7 @@ def _wait_for_workflow_completion( owner: str, repo: str, workflow_file: str, - max_wait: int = 600, + max_wait: int = 90, ) -> bool: """Wait for GitHub Actions workflows to complete processing.""" print(f"⏳ Waiting for {workflow_file} workflows to complete...") @@ -649,7 +649,7 @@ def _run_unit_tests( # Wait for workflows to complete _wait_for_workflow_completion( - headers, owner, repo, "pr-automation.yml", max_wait=300 + headers, owner, repo, "pr-automation.yml", max_wait=90 ) # Verify each test PR failed appropriately diff --git a/tasks/notion/it_trouble_shooting_hub/verification_expired_update/meta.json b/tasks/notion/it_trouble_shooting_hub/verification_expired_update/meta.json index c09a53d..a4602df 100644 --- a/tasks/notion/it_trouble_shooting_hub/verification_expired_update/meta.json +++ b/tasks/notion/it_trouble_shooting_hub/verification_expired_update/meta.json @@ -21,6 +21,6 @@ "stateType": "url", "stateContent": null, "stateUrl": "https://painted-tennis-ebc.notion.site/It-Trouble-Shooting-Hub-23e81626b6d78020aba7eb65ae1cc2d5", - "stateOriginalUrl": "https://www.notion.so/marketplace/templates/japantravelplanner101" + "stateOriginalUrl": "https://www.notion.so/marketplace/templates/it-trouble-shooting-hub" } } \ No newline at end of file