diff --git a/main.py b/main.py index eaa5d81e..f0bb018c 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import utils.calculate_average_score import utils.calculate_score_from_log import common_benchmark +import run_benchmark_with_monitor import dotenv import utils.eval_answer_from_log import fire @@ -35,6 +36,7 @@ def print_config(*args): "print-config": print_config, "trace": utils.trace_single_task.main, "common-benchmark": common_benchmark.main, + "run-benchmark-with-monitor": run_benchmark_with_monitor.main, "eval-answer": utils.eval_answer_from_log.main, "avg-score": utils.calculate_average_score.main, "score-from-log": utils.calculate_score_from_log.main, diff --git a/monitor_guide.md b/monitor_guide.md new file mode 100644 index 00000000..5961bb51 --- /dev/null +++ b/monitor_guide.md @@ -0,0 +1,75 @@ +# Web Monitoring Guide for Benchmark Evaluation + +This document provides guidance for using the web monitoring dashboard while evaluating benchmarks with MiroFlow. + +## Overview + +The web monitoring system provides real-time progress tracking, statistics, and task reports through a web interface. It runs alongside the benchmark evaluation process. + +## Architecture + +```txt +run_benchmark_with_monitor.py (Wrapper) + ├─> Process 1: common_benchmark.py (Executor) + │ └─> Executes tasks and generates log files + │ + └─> Process 2: benchmark_monitor.py (Monitor) + └─> Reads log files and displays monitoring interface + └─> Generates task reports via generate_benchmark_report.py +``` + +## Features + +- **Real-time Dashboard**: Monitor progress, statistics, and task status in real-time +- **Web Interface**: Access dashboard at `http://localhost:8080` (or next available port) +- **Task Reports**: View detailed reports for individual tasks +- **Benchmark-Specific Metrics**: Tailored statistics for different benchmark types (GAIA, FutureX, FinSearchComp, xBench) +- **Auto-refresh**: Dashboard updates automatically every 30 seconds + +## Supported Benchmarks + +`run_benchmark_with_monitor.py` currently supports the following benchmark evaluations: + +- **GAIA Validation** +- **FutureX** +- **FinSearchComp** +- **xBench-DeepSearch** + +## Usage Examples + +#### GAIA Benchmark + +```bash +uv run main.py run-benchmark-with-monitor \ + --config_file_name=agent_gaia-validation_claude37sonnet \ + --output_dir="logs/gaia-validation-claude37sonnet/$(date +"%Y%m%d_%H%M")" +``` + +#### FutureX Benchmark + +```bash +uv run main.py run-benchmark-with-monitor \ + --config_file_name=agent_quickstart_reading \ + benchmark=futurex \ + --output_dir="logs/futurex/$(date +"%Y%m%d_%H%M")" +``` + +#### FinSearchComp Benchmark + +```bash +uv run main.py run-benchmark-with-monitor \ + --config_file_name=agent_finsearchcomp_claude37sonnet \ + --output_dir="logs/finsearchcomp-claude37sonnet/$(date +"%Y%m%d_%H%M")" +``` + +#### xBench-DeepSearch Benchmark + +```bash +uv run main.py run-benchmark-with-monitor \ + --config_file_name=agent_xbench-ds_claude37sonnet \ + benchmark=xbench-ds \ + --output_dir="logs/xbench-ds/$(date +"%Y%m%d_%H%M")" +``` + +💡 To resume an interrupted evaluation, simply replace the output directory with an existing log directory. + diff --git a/run_benchmark_with_monitor.py b/run_benchmark_with_monitor.py new file mode 100644 index 00000000..edc1da97 --- /dev/null +++ b/run_benchmark_with_monitor.py @@ -0,0 +1,132 @@ +# SPDX-FileCopyrightText: 2025 MiromindAI +# +# SPDX-License-Identifier: Apache-2.0 + +import os +import subprocess +import signal +import sys +import time +from typing import Optional + + +def main(*args, config_file_name: str = "", output_dir: str = "", web_port: int = 8080): + """Run benchmark with integrated web monitoring""" + + # Validate required arguments + if not output_dir: + print("Error: output_dir is required") + print( + "Usage: uv run main.py run-benchmark-with-monitor --config_file_name=name --output_dir=path" + ) + return 1 + + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + print("=" * 50) + print("Benchmark Runner with Monitor") + print("=" * 50) + print(f"Output directory: {output_dir}") + print(f"Config name: {config_file_name}") + print(f"Web port: {web_port}") + print("=" * 50) + + # Global variables for process management + benchmark_process: Optional[subprocess.Popen] = None + monitor_process: Optional[subprocess.Popen] = None + + def cleanup_processes(): + """Clean up running processes""" + print("\nShutting down processes...") + + if benchmark_process and benchmark_process.poll() is None: + print(f"Stopping benchmark (PID: {benchmark_process.pid})...") + benchmark_process.terminate() + try: + benchmark_process.wait(timeout=5) + except subprocess.TimeoutExpired: + benchmark_process.kill() + + if monitor_process and monitor_process.poll() is None: + print(f"Stopping monitor (PID: {monitor_process.pid})...") + monitor_process.terminate() + try: + monitor_process.wait(timeout=5) + except subprocess.TimeoutExpired: + monitor_process.kill() + + print("Cleanup complete.") + + def signal_handler(signum, frame): + """Handle Ctrl+C gracefully""" + cleanup_processes() + sys.exit(0) + + # Set up signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # Start benchmark + print("Starting benchmark...") + benchmark_cmd = [ + "uv", + "run", + "main.py", + "common-benchmark", + f"--config_file_name={config_file_name}", + f"output_dir={output_dir}", + ] + # Add any additional arguments (e.g., benchmark=futurex) + benchmark_cmd.extend(list(args)) + benchmark_process = subprocess.Popen(benchmark_cmd) + print(f"Benchmark started with PID: {benchmark_process.pid}") + + # Wait a moment for benchmark to initialize + time.sleep(3) + + # Start monitor + print("Starting web monitor...") + monitor_cmd = [ + "uv", + "run", + "utils/progress_check/benchmark_monitor.py", + output_dir, + f"--web-port={web_port}", + ] + monitor_process = subprocess.Popen(monitor_cmd) + print(f"Monitor started with PID: {monitor_process.pid}") + print(f"Web dashboard available at: http://localhost:{web_port}") + + print("\n" + "=" * 50) + print("Both processes are running!") + print("Press Ctrl+C to stop both processes") + print("Monitor will continue running even if benchmark finishes") + print("=" * 50) + + # Monitor the processes + while True: + time.sleep(5) + + # Check if benchmark process is still running + if benchmark_process and benchmark_process.poll() is not None: + print("Benchmark process ended") + benchmark_process = None + + # Check if monitor process is still running + if monitor_process and monitor_process.poll() is not None: + print("Monitor process died unexpectedly. Restarting...") + monitor_process = subprocess.Popen(monitor_cmd) + print(f"Monitor restarted with PID: {monitor_process.pid}") + + except KeyboardInterrupt: + cleanup_processes() + + return 0 + + +if __name__ == "__main__": + import fire + + fire.Fire(main) diff --git a/utils/progress_check/benchmark_monitor.py b/utils/progress_check/benchmark_monitor.py new file mode 100644 index 00000000..8a106afd --- /dev/null +++ b/utils/progress_check/benchmark_monitor.py @@ -0,0 +1,1008 @@ +""" +Benchmark Monitor with Web Interface + +This script provides monitoring capabilities for any benchmark including: +- Real-time web dashboard +- Historical data tracking +- Progress monitoring + +Usage: + uv run utils/progress_check/benchmark_monitor.py [LOG_FOLDER_PATH] [OPTIONS] + +Options: + --web-port PORT Web interface port (default: 8080) +""" + +import json +import time +import argparse +from pathlib import Path +from typing import Dict, List, Any, Optional +from datetime import datetime +import threading +import os +import socket +import re +from http.server import HTTPServer, BaseHTTPRequestHandler +from omegaconf import OmegaConf + + +class WebDashboard: + """Simple web dashboard for monitoring""" + + def __init__(self, monitor, port: int = 8080): + self.monitor = monitor + self.port = port + self.server = None + self.benchmark_name = monitor.benchmark_name + + def _is_port_available(self, port: int) -> bool: + """Check if a port is available""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("localhost", port)) + return True + except OSError: + return False + + def _find_available_port(self, start_port: int, max_attempts: int = 100) -> int: + """Find an available port starting from start_port""" + port = start_port + for _ in range(max_attempts): + if self._is_port_available(port): + return port + port += 1 + raise RuntimeError(f"Could not find an available port after {max_attempts} attempts") + + def start_server(self): + """Start the web server with automatic port conflict resolution""" + handler = self.create_handler() + + # Find an available port, starting from the requested port + actual_port = self._find_available_port(self.port) + + # Update self.port to reflect the actual port being used + if actual_port != self.port: + print(f"Port {self.port} is already in use, using port {actual_port} instead") + self.port = actual_port + + self.server = HTTPServer(("localhost", self.port), handler) + print(f"Web dashboard available at: http://localhost:{self.port}") + + def run_server(): + self.server.serve_forever() + + thread = threading.Thread(target=run_server, daemon=True) + thread.start() + + def create_handler(self): + """Create HTTP request handler""" + monitor = self.monitor + + class DashboardHandler(BaseHTTPRequestHandler): + def log_message(self, format, *args): + """Override to suppress HTTP request logs""" + pass + + def do_GET(self): + if self.path == "/": + self.send_dashboard() + elif self.path == "/api/status": + self.send_json(monitor.get_status_json()) + elif self.path == "/api/tasks": + self.send_json(monitor.get_tasks_json()) + elif self.path.startswith("/api/task-report/"): + task_id = self.path.split("/")[-1] + self.send_task_report(task_id) + else: + self.send_error(404) + + def send_dashboard(self): + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + html = self.generate_dashboard_html() + self.wfile.write(html.encode()) + + def send_json(self, data): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data, default=str).encode()) + + def send_task_report(self, task_id): + """Send task report for a specific task""" + try: + # Try to find the task in the current running tasks + task_info = monitor.get_task_info(task_id) + if not task_info: + self.send_error(404, "Task not found") + return + + # Generate report using the standalone report generator + report_content = monitor.generate_task_report(task_id) + if not report_content: + self.send_error(500, "Failed to generate report") + return + + self.send_response(200) + self.send_header("Content-type", "text/plain; charset=utf-8") + self.end_headers() + self.wfile.write(report_content.encode("utf-8")) + + except Exception as e: + self.send_error(500, f"Error generating report: {str(e)}") + + def generate_dashboard_html(self): + benchmark_name = monitor.benchmark_name + return f""" + + + + {benchmark_name} Monitor Dashboard + + + + + + +
+

{benchmark_name} Monitor Dashboard

+ +
+

Overall Progress

+
+
+
+

Progress: 0%

+
+ +
+

Key Metrics

+
+
+
0
+
Total Tasks
+
+
+
0
+
Completed
+
+
+
0
+
Running
+
+
+
0
+
Failed
+
+
+
+ +
+

Recent Tasks

+ +
+ Loading... +
+
+
+ + + """ + + return DashboardHandler + + +class BenchmarkMonitor: + """Generic benchmark monitor with web interface""" + + def __init__(self, log_folder: str): + self.log_folder = Path(log_folder) + self.start_time = datetime.now() + self.benchmark_name = self._detect_benchmark_name() + self.benchmark_type = self._detect_benchmark_type() + + # Initialize statistics based on benchmark type + self.stats = self._initialize_stats() + + self.tasks = {} + self.recent_activity = [] + + def _detect_benchmark_name(self) -> str: + """Detect benchmark name from log folder path or config file""" + # Try to get from .hydra/config.yaml first + hydra_config_path = self.log_folder / ".hydra" / "config.yaml" + if hydra_config_path.exists(): + try: + cfg = OmegaConf.load(hydra_config_path) + benchmark_name = cfg.get("benchmark", {}).get("name", "") + if benchmark_name: + return self._format_benchmark_name(benchmark_name) + except Exception: + pass + + # Try to extract from path (e.g., logs/gaia/... -> GAIA) + path_parts = self.log_folder.parts + if "logs" in path_parts: + idx = path_parts.index("logs") + if idx + 1 < len(path_parts): + benchmark_name = path_parts[idx + 1] + return self._format_benchmark_name(benchmark_name) + + # Default fallback + return "Benchmark" + + def _format_benchmark_name(self, name: str) -> str: + """Format benchmark name to a friendly display format""" + name_lower = name.lower().replace("-", "").replace("_", "") + + # Map common benchmark names to their preferred display format + name_mapping = { + "finsearchcomp": "FinSearchComp", + "futurex": "FutureX", + "future-x": "FutureX", + "gaia": "GAIA", + "xbench": "xbench", + "x-bench": "xbench", + "browsecomp": "BrowseComp", + "browsecomp-zh": "BrowseComp-ZH", + } + + # Check exact match first + if name_lower in name_mapping: + return name_mapping[name_lower] + + # Check partial match (e.g., "finsearchcomp-claude" -> "FinSearchComp") + for key, value in name_mapping.items(): + if name_lower.startswith(key): + return value + + # Default: convert to title case (e.g., "example_dataset" -> "Example Dataset") + return name.replace("-", " ").replace("_", " ").title() + + def _detect_benchmark_type(self) -> str: + """Detect benchmark type to determine statistics logic""" + name_lower = self.benchmark_name.lower() + + if "gaia" in name_lower: + return "gaia" # Has ground truth, needs correctness evaluation + elif "futurex" in name_lower or "future-x" in name_lower: + return "futurex" # No ground truth, prediction-focused + elif "xbench" in name_lower or "x-bench" in name_lower: + return "xbench" # No ground truth, prediction-focused + elif "finsearchcomp" in name_lower or "finsearch-comp" in name_lower: + return "finsearchcomp" # Has ground truth, needs task type breakdown + else: + return "default" # Default: assume has ground truth + + def _initialize_stats(self) -> Dict[str, Any]: + """Initialize statistics based on benchmark type""" + base_stats = { + "total_tasks": 0, + "completed_tasks": 0, + "running_tasks": 0, + "failed_tasks": 0, + "execution_times": [], + "error_types": {}, + "task_types": {}, + "last_update": None, + } + + if self.benchmark_type == "gaia": + # GAIA: correctness evaluation + base_stats.update({ + "correct_answers": 0, + "incorrect_answers": 0, + }) + elif self.benchmark_type in ["futurex", "xbench"]: + # FutureX/xbench: prediction-focused + base_stats.update({ + "with_predictions": 0, + "without_predictions": 0, + "with_errors": 0, + }) + elif self.benchmark_type == "finsearchcomp": + # FinSearchComp: task type and regional breakdown (like check_finsearchcomp_progress.py) + base_stats.update({ + "correct_answers": 0, # T2+T3 only + "incorrect_answers": 0, # T2+T3 only + "task_type_breakdown": { + "T1": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + "T2": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + "T3": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + "Unknown": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + }, + "regional_breakdown": { + "Global": { + "T2": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + "T3": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + }, + "Greater China": { + "T2": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + "T3": {"total": 0, "completed": 0, "correct": 0, "incorrect": 0}, + }, + }, + }) + else: + # Default: assume has ground truth + base_stats.update({ + "correct_answers": 0, + "incorrect_answers": 0, + }) + + return base_stats + + def scan_log_files(self) -> List[Path]: + """Scan for all task log files""" + if not self.log_folder.exists(): + return [] + return sorted( + self.log_folder.glob("task_*_attempt_*.json"), + key=lambda x: x.stat().st_mtime, + reverse=True, + ) + + def parse_task_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + """Parse a single task log file""" + try: + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError, KeyError): + return None + + def extract_task_info( + self, data: Dict[str, Any], file_path: Path + ) -> Dict[str, Any]: + """Extract relevant information from task data""" + task_id = data.get("task_id", "unknown") + status = data.get("status", "unknown").lower() + judge_result = data.get("judge_result", "").upper() + final_answer = data.get("final_boxed_answer", "") + error_msg = data.get("error", "") + + # Extract attempt number from filename (e.g., task_xxx_attempt_1.json -> 1) + attempt = 1 # Default + match = re.search(r"_attempt_(\d+)\.json$", str(file_path)) + if match: + attempt = int(match.group(1)) + + # Extract execution time + start_time = data.get("start_time") + end_time = data.get("end_time") + execution_time = None + + if start_time and end_time: + try: + start_dt = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + execution_time = (end_dt - start_dt).total_seconds() + except Exception: + pass + + # Extract task type from metadata or task_id + task_type = "" + metadata = data.get("metadata", {}) + if isinstance(metadata, dict): + # Try to get task type from various metadata fields + if "Level" in metadata: + task_type = f"Level {metadata['Level']}" + elif "task_type" in metadata: + task_type = str(metadata["task_type"]) + elif "type" in metadata: + task_type = str(metadata["type"]) + elif "difficulty" in metadata: + task_type = f"Difficulty {metadata['difficulty']}" + + # For FinSearchComp, extract task type from task_id (e.g., "(T1)Time_Sensitive_Data_Fetching_006") + if self.benchmark_type == "finsearchcomp" and not task_type: + match = re.match(r"^\(T(\d+)\)", task_id) + if match: + task_type = f"T{match.group(1)}" + + # Extract region for FinSearchComp + region = "" + if self.benchmark_type == "finsearchcomp": + label = data.get("input", {}).get("metadata", {}).get("label", "") + if "(Global)" in label: + region = "Global" + elif "(Greater China)" in label: + region = "Greater China" + + return { + "task_id": task_id, + "file_path": str(file_path), + "status": status, + "judge_result": judge_result, + "final_answer": final_answer, + "error": error_msg, + "execution_time": execution_time, + "task_type": task_type, + "region": region, + "attempt": attempt, + "last_modified": file_path.stat().st_mtime, + } + + def update_statistics(self, task_info: Dict[str, Any]): + """Update monitoring statistics based on benchmark type""" + task_id = task_info["task_id"] + status = task_info["status"] + judge_result = task_info["judge_result"] + execution_time = task_info["execution_time"] + final_answer = task_info.get("final_answer", "") + error_msg = task_info.get("error", "") + task_type = task_info.get("task_type", "") + + # Update task tracking + if task_id not in self.tasks: + self.tasks[task_id] = task_info + self.stats["total_tasks"] += 1 + region = task_info.get("region", "") + self._update_stats_for_new_task(status, judge_result, final_answer, error_msg, task_type, region) + else: + # Update existing task - only update if status changed + old_status = self.tasks[task_id]["status"] + if old_status != status: + self.recent_activity.append( + { + "task_id": task_id, + "old_status": old_status, + "new_status": status, + "timestamp": datetime.now(), + } + ) + old_region = self.tasks[task_id].get("region", "") + new_region = task_info.get("region", "") + self._update_stats_for_status_change( + old_status, status, + self.tasks[task_id].get("judge_result", ""), + judge_result, + self.tasks[task_id].get("final_answer", ""), + final_answer, + self.tasks[task_id].get("error", ""), + error_msg, + task_type, + old_region, + new_region + ) + self.tasks[task_id] = task_info + + # Track execution times + if execution_time is not None: + self.stats["execution_times"].append(execution_time) + if len(self.stats["execution_times"]) > 100: + self.stats["execution_times"] = self.stats["execution_times"][-100:] + + def _update_stats_for_new_task(self, status: str, judge_result: str, + final_answer: str, error_msg: str, task_type: str, region: str = ""): + """Update statistics for a new task based on benchmark type (like check_finsearchcomp_progress.py)""" + if status == "completed": + self.stats["completed_tasks"] += 1 + + if self.benchmark_type == "gaia": + if judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + elif judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] += 1 + elif self.benchmark_type in ["futurex", "xbench"]: + # For xbench/futurex: count predictions for all tasks (like check_xbench_progress.py) + # But prediction_rate is calculated as with_predictions / completed + pass # Predictions and errors are counted below for all statuses + elif self.benchmark_type == "finsearchcomp": + if task_type in ["T1", "T2", "T3", "Unknown"]: + self.stats["task_type_breakdown"][task_type]["completed"] += 1 + + # For T1 tasks, exclude from correctness evaluation (like check_finsearchcomp_progress.py) + # T1 tasks are considered "completed" but not evaluated for correctness due to outdated ground truth + if task_type == "T1": + pass # T1 tasks are excluded from correctness evaluation + elif task_type in ["T2", "T3"]: + # For T2 and T3 tasks, evaluate correctness (like check_finsearchcomp_progress.py) + # If judge_result is CORRECT, count as correct; otherwise (including NOT_ATTEMPTED) count as incorrect + if judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + self.stats["task_type_breakdown"][task_type]["correct"] += 1 + # Update regional breakdown for correct T2 and T3 tasks + if region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][region][task_type]["correct"] += 1 + else: + # All non-CORRECT results (including NOT_ATTEMPTED, INCORRECT, ERROR) count as incorrect + self.stats["incorrect_answers"] += 1 + self.stats["task_type_breakdown"][task_type]["incorrect"] += 1 + # Update regional breakdown for incorrect T2 and T3 tasks + if region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][region][task_type]["incorrect"] += 1 + else: # default + if judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + elif judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] += 1 + elif status == "running": + self.stats["running_tasks"] += 1 + elif status in ["failed", "error", "interrupted"]: + self.stats["failed_tasks"] += 1 + + # For xbench/futurex: count predictions and errors for ALL tasks (like check_xbench_progress.py) + if self.benchmark_type in ["futurex", "xbench"]: + if final_answer and final_answer.strip(): + self.stats["with_predictions"] += 1 + else: + self.stats["without_predictions"] += 1 + if error_msg and error_msg.strip(): + self.stats["with_errors"] += 1 + + # Update task type breakdown for FinSearchComp + if self.benchmark_type == "finsearchcomp" and task_type: + if task_type in ["T1", "T2", "T3", "Unknown"]: + self.stats["task_type_breakdown"][task_type]["total"] += 1 + # Update regional breakdown for T2 and T3 tasks + if task_type in ["T2", "T3"] and region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][region][task_type]["total"] += 1 + if status == "completed": + self.stats["regional_breakdown"][region][task_type]["completed"] += 1 + + def _update_stats_for_status_change(self, old_status: str, new_status: str, + old_judge_result: str, new_judge_result: str, + old_final_answer: str, new_final_answer: str, + old_error: str, new_error: str, + task_type: str, old_region: str = "", new_region: str = ""): + """Update statistics when task status changes""" + # Decrease old status count + if old_status == "completed": + self.stats["completed_tasks"] -= 1 + if self.benchmark_type == "gaia": + if old_judge_result == "CORRECT": + self.stats["correct_answers"] -= 1 + elif old_judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] -= 1 + elif self.benchmark_type in ["futurex", "xbench"]: + # Predictions and errors are updated below for all statuses + pass + elif self.benchmark_type == "finsearchcomp": + if task_type in ["T1", "T2", "T3", "Unknown"]: + self.stats["task_type_breakdown"][task_type]["completed"] -= 1 + # For T1 tasks, exclude from correctness evaluation (like check_finsearchcomp_progress.py) + if task_type == "T1": + pass # T1 tasks are excluded from correctness evaluation + elif task_type in ["T2", "T3"]: + # Like check_finsearchcomp_progress.py: if CORRECT, count as correct; otherwise as incorrect + if old_judge_result == "CORRECT": + self.stats["correct_answers"] -= 1 + self.stats["task_type_breakdown"][task_type]["correct"] -= 1 + # Update regional breakdown for correct T2 and T3 tasks + if old_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][old_region][task_type]["correct"] -= 1 + else: + # All non-CORRECT results count as incorrect + self.stats["incorrect_answers"] -= 1 + self.stats["task_type_breakdown"][task_type]["incorrect"] -= 1 + # Update regional breakdown for incorrect T2 and T3 tasks + if old_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][old_region][task_type]["incorrect"] -= 1 + # Update regional breakdown for completed T2 and T3 tasks + if old_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][old_region][task_type]["completed"] -= 1 + else: # default + if old_judge_result == "CORRECT": + self.stats["correct_answers"] -= 1 + elif old_judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] -= 1 + elif old_status == "running": + self.stats["running_tasks"] -= 1 + elif old_status in ["failed", "error", "interrupted"]: + self.stats["failed_tasks"] -= 1 + + # Increase new status count + if new_status == "completed": + self.stats["completed_tasks"] += 1 + if self.benchmark_type == "gaia": + if new_judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + elif new_judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] += 1 + elif self.benchmark_type in ["futurex", "xbench"]: + # Predictions and errors are updated below for all statuses + pass + elif self.benchmark_type == "finsearchcomp": + if task_type in ["T1", "T2", "T3", "Unknown"]: + self.stats["task_type_breakdown"][task_type]["completed"] += 1 + + # For T1 tasks, exclude from correctness evaluation (like check_finsearchcomp_progress.py) + # T1 tasks are considered "completed" but not evaluated for correctness due to outdated ground truth + if task_type == "T1": + pass # T1 tasks are excluded from correctness evaluation + elif task_type in ["T2", "T3"]: + # For T2 and T3 tasks, evaluate correctness (like check_finsearchcomp_progress.py) + # If judge_result is CORRECT, count as correct; otherwise (including NOT_ATTEMPTED) count as incorrect + if new_judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + self.stats["task_type_breakdown"][task_type]["correct"] += 1 + # Update regional breakdown for correct T2 and T3 tasks + if new_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][new_region][task_type]["correct"] += 1 + else: + # All non-CORRECT results (including NOT_ATTEMPTED, INCORRECT, ERROR) count as incorrect + self.stats["incorrect_answers"] += 1 + self.stats["task_type_breakdown"][task_type]["incorrect"] += 1 + # Update regional breakdown for incorrect T2 and T3 tasks + if new_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][new_region][task_type]["incorrect"] += 1 + # Update regional breakdown for completed T2 and T3 tasks + if new_region in ["Global", "Greater China"]: + self.stats["regional_breakdown"][new_region][task_type]["completed"] += 1 + else: # default + if new_judge_result == "CORRECT": + self.stats["correct_answers"] += 1 + elif new_judge_result in ["INCORRECT", "ERROR"]: + self.stats["incorrect_answers"] += 1 + elif new_status == "running": + self.stats["running_tasks"] += 1 + elif new_status in ["failed", "error", "interrupted"]: + self.stats["failed_tasks"] += 1 + + # For xbench/futurex: update predictions and errors for ALL statuses (like check_xbench_progress.py) + if self.benchmark_type in ["futurex", "xbench"]: + # Decrease old counts + if old_final_answer and old_final_answer.strip(): + self.stats["with_predictions"] -= 1 + else: + self.stats["without_predictions"] -= 1 + if old_error and old_error.strip(): + self.stats["with_errors"] -= 1 + + # Increase new counts + if new_final_answer and new_final_answer.strip(): + self.stats["with_predictions"] += 1 + else: + self.stats["without_predictions"] += 1 + if new_error and new_error.strip(): + self.stats["with_errors"] += 1 + + def get_status_json(self) -> Dict[str, Any]: + """Get current status as JSON for web interface, based on benchmark type""" + total = self.stats["total_tasks"] + completed = self.stats["completed_tasks"] + running = self.stats["running_tasks"] + failed = self.stats["failed_tasks"] + + progress_pct = (completed / total * 100) if total > 0 else 0 + progress_pct = min(progress_pct, 100.0) # Cap at 100% + + exec_times = self.stats["execution_times"] + avg_execution_time = sum(exec_times) / len(exec_times) if exec_times else 0 + + elapsed_time = (datetime.now() - self.start_time).total_seconds() + tasks_per_second = completed / elapsed_time if elapsed_time > 0 else 0 + + result = { + "total_tasks": total, + "completed_tasks": completed, + "running_tasks": running, + "failed_tasks": failed, + "progress_pct": progress_pct, + "avg_execution_time": avg_execution_time, + "tasks_per_second": tasks_per_second, + "benchmark_type": self.benchmark_type, + "last_update": self.stats["last_update"].isoformat() + if self.stats["last_update"] + else None, + } + + # Add type-specific metrics + if self.benchmark_type == "gaia": + total_judged = self.stats["correct_answers"] + self.stats["incorrect_answers"] + accuracy = ( + (self.stats["correct_answers"] / total_judged * 100) + if total_judged > 0 + else 0 + ) + result.update({ + "correct_answers": self.stats["correct_answers"], + "incorrect_answers": self.stats["incorrect_answers"], + "accuracy": accuracy, + }) + elif self.benchmark_type in ["futurex", "xbench"]: + prediction_rate = ( + (self.stats["with_predictions"] / completed * 100) + if completed > 0 + else 0 + ) + result.update({ + "with_predictions": self.stats["with_predictions"], + "without_predictions": self.stats["without_predictions"], + "with_errors": self.stats["with_errors"], + "prediction_rate": prediction_rate, + }) + elif self.benchmark_type == "finsearchcomp": + t2_t3_completed = ( + self.stats["task_type_breakdown"]["T2"]["completed"] + + self.stats["task_type_breakdown"]["T3"]["completed"] + ) + t2_t3_correct = ( + self.stats["task_type_breakdown"]["T2"]["correct"] + + self.stats["task_type_breakdown"]["T3"]["correct"] + ) + accuracy = ( + (t2_t3_correct / t2_t3_completed * 100) + if t2_t3_completed > 0 + else 0 + ) + result.update({ + "correct_answers": self.stats["correct_answers"], # T2+T3 only + "incorrect_answers": self.stats["incorrect_answers"], # T2+T3 only + "accuracy": accuracy, # T2+T3 accuracy + "task_type_breakdown": self.stats["task_type_breakdown"], + "regional_breakdown": self.stats["regional_breakdown"], # Like check_finsearchcomp_progress.py + "t1_completed": self.stats["task_type_breakdown"]["T1"]["completed"], + }) + else: # default + total_judged = self.stats["correct_answers"] + self.stats["incorrect_answers"] + accuracy = ( + (self.stats["correct_answers"] / total_judged * 100) + if total_judged > 0 + else 0 + ) + result.update({ + "correct_answers": self.stats["correct_answers"], + "incorrect_answers": self.stats["incorrect_answers"], + "accuracy": accuracy, + }) + + return result + + def get_tasks_json(self) -> List[Dict[str, Any]]: + """Get tasks list as JSON for web interface""" + tasks_list = [] + for task_info in sorted( + self.tasks.values(), key=lambda x: x["last_modified"], reverse=True + ): + # For FutureX/xbench, don't include judge_result (like check_futurex_progress.py, check_xbench_progress.py) + task_dict = { + "task_id": task_info["task_id"], + "status": task_info["status"], + "task_type": task_info["task_type"], + "execution_time": task_info["execution_time"], + } + + # Exclude judge_result for FutureX and xbench (like check_futurex_progress.py, check_xbench_progress.py) + if self.benchmark_type not in ["futurex", "xbench"]: + task_dict["judge_result"] = task_info["judge_result"] + else: + # For FutureX/xbench, include final_answer instead (for display purposes) + task_dict["final_answer"] = task_info.get("final_answer", "") + + tasks_list.append(task_dict) + + return tasks_list + + def scan_and_update(self): + """Scan log files and update statistics""" + log_files = self.scan_log_files() + + for file_path in log_files: + data = self.parse_task_file(file_path) + if data: + task_info = self.extract_task_info(data, file_path) + self.update_statistics(task_info) + + self.stats["last_update"] = datetime.now() + + def get_task_info(self, task_id: str) -> Optional[Dict[str, Any]]: + """Get information about a specific task""" + return self.tasks.get(task_id) + + def generate_task_report(self, task_id: str) -> Optional[str]: + """Generate report by calling the standalone report generator""" + try: + # Get task info to extract attempt number + task_info = self.get_task_info(task_id) + if not task_info: + return f"Error: Task {task_id} not found" + + attempt = task_info.get("attempt", 1) + + # Import the report generator module + import importlib.util + report_generator_path = os.path.join( + os.path.dirname(__file__), "generate_benchmark_report.py" + ) + + spec = importlib.util.spec_from_file_location( + "generate_benchmark_report", + report_generator_path, + ) + if spec is None or spec.loader is None: + return f"Error: Could not load report generator module" + + report_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(report_module) + + # Call the report generator + report_path = report_module.generate_task_report_from_log( + log_dir=str(self.log_folder), + task_id=task_id, + attempt=attempt, + output_dir=None, # Use default output directory + ) + + if report_path and os.path.exists(report_path): + # Read and return the generated report + with open(report_path, "r", encoding="utf-8") as f: + return f.read() + + return f"Error: Failed to generate report for task {task_id}" + + except Exception as e: + return f"Error generating report for task {task_id}: {str(e)}" + + +def main(): + parser = argparse.ArgumentParser(description="Benchmark Monitor with Web Interface") + parser.add_argument("log_folder", nargs="?", default=".", help="Path to benchmark log folder") + parser.add_argument("--web-port", type=int, default=8080, help="Web interface port") + + args = parser.parse_args() + + if not Path(args.log_folder).exists(): + print(f"Error: Log folder not found: {args.log_folder}") + return 1 + + # Create monitor + monitor = BenchmarkMonitor(args.log_folder) + + # Start web dashboard + dashboard = WebDashboard(monitor, args.web_port) + dashboard.start_server() + + print("Benchmark Monitor started") + print(f"Monitoring logs in: {args.log_folder}") + print(f"Web dashboard: http://localhost:{dashboard.port}") + print("Press Ctrl+C to stop") + + try: + while True: + monitor.scan_and_update() + time.sleep(30) # Update every 30 seconds + except KeyboardInterrupt: + print("\nMonitor stopped by user") + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/utils/progress_check/check_finsearchcomp_progress.py b/utils/progress_check/check_finsearchcomp_progress.py index 11045825..c96e9cc5 100755 --- a/utils/progress_check/check_finsearchcomp_progress.py +++ b/utils/progress_check/check_finsearchcomp_progress.py @@ -21,7 +21,7 @@ import re import sys from pathlib import Path -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Any def extract_task_type(task_id: str) -> str: @@ -61,7 +61,7 @@ def extract_region_from_label(label: str) -> str: return "Unknown" -def analyze_finsearchcomp_results(log_folder: str) -> Dict[str, any]: +def analyze_finsearchcomp_results(log_folder: str) -> Dict[str, Any]: """ Analyze FinSearchComp benchmark results from JSON log files. @@ -192,7 +192,7 @@ def analyze_finsearchcomp_results(log_folder: str) -> Dict[str, any]: def display_results( - results: Dict[str, any], + results: Dict[str, Any], correct_files: List[str], incorrect_files: List[Tuple[str, str]], error_files: List[Tuple[str, str]], diff --git a/utils/progress_check/generate_benchmark_report.py b/utils/progress_check/generate_benchmark_report.py new file mode 100644 index 00000000..ffff25c7 --- /dev/null +++ b/utils/progress_check/generate_benchmark_report.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +Generic Benchmark Task Report Generator + +This script generates detailed text reports for tasks from benchmark log files. +Works with any benchmark dataset (GAIA, FinSearchComp, FutureX, etc.) +""" + +import json +import os +import sys +from pathlib import Path +from typing import Optional, Dict, Any + + +def find_task_log_file(log_dir: str, task_id: str, attempt: int = 1) -> Optional[Path]: + """Find task log file in the log directory""" + log_path = Path(log_dir) + if not log_path.exists(): + return None + + # Try to find the log file + pattern = f"task_{task_id}_attempt_{attempt}.json" + log_file = log_path / pattern + + if log_file.exists(): + return log_file + + # Try without attempt number + pattern = f"task_{task_id}.json" + log_file = log_path / pattern + if log_file.exists(): + return log_file + + return None + + +def load_task_from_log(log_file: Path) -> Optional[Dict[str, Any]]: + """Load task data from log file""" + try: + with open(log_file, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + return None + + +def extract_question(log_data: Dict[str, Any]) -> str: + """Extract question from log data in various formats""" + # Try different possible locations + if "task_question" in log_data: + return log_data["task_question"] + + if "input" in log_data: + input_data = log_data["input"] + if isinstance(input_data, dict): + if "task_description" in input_data: + return input_data["task_description"] + elif "task_question" in input_data: + return input_data["task_question"] + elif isinstance(input_data, str): + return input_data + + return "N/A" + + +def extract_metadata_info(log_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract metadata information from log data""" + metadata_info = {} + + # Try to get metadata from various locations + metadata = log_data.get("metadata", {}) + if isinstance(metadata, dict): + metadata_info.update(metadata) + + # Also check input.metadata + if "input" in log_data and isinstance(log_data["input"], dict): + input_metadata = log_data["input"].get("metadata", {}) + if isinstance(input_metadata, dict): + metadata_info.update(input_metadata) + + return metadata_info + + +def generate_task_report_from_log( + log_dir: str, + task_id: str, + attempt: int = 1, + output_dir: Optional[str] = None +) -> Optional[str]: + """Generate detailed text report from task log file""" + + # Find the log file + log_file = find_task_log_file(log_dir, task_id, attempt) + if not log_file: + print(f"❌ Error: Log file not found for task {task_id} (attempt {attempt})") + return None + + # Load task data + log_data = load_task_from_log(log_file) + if not log_data: + print(f"❌ Error: Failed to load log file: {log_file}") + return None + + # Set output directory (default to log_dir/reports) + if output_dir is None: + output_dir = os.path.join(log_dir, "reports") + + # Ensure the directory exists + os.makedirs(output_dir, exist_ok=True) + + # Generate report file + report_filename = f"task_{task_id}_report.txt" + report_path = os.path.join(output_dir, report_filename) + + # Extract information + question = extract_question(log_data) + ground_truth = log_data.get("ground_truth", "N/A") + final_answer = log_data.get("final_boxed_answer", log_data.get("final_answer", "N/A")) + status = log_data.get("status", "unknown") + judge_result = log_data.get("judge_result", "N/A") + error = log_data.get("error", "") + + # Extract execution time + execution_time = None + start_time = log_data.get("start_time") + end_time = log_data.get("end_time") + if start_time and end_time: + try: + from datetime import datetime + start_dt = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + execution_time = (end_dt - start_dt).total_seconds() + except Exception: + pass + + # Extract metadata + metadata_info = extract_metadata_info(log_data) + + # Generate report + with open(report_path, "w", encoding="utf-8") as f: + f.write("=" * 80 + "\n") + f.write(f"Benchmark Task Report: {task_id}\n") + f.write("=" * 80 + "\n\n") + + # Basic information + f.write("1. Task Basic Information\n") + f.write("-" * 40 + "\n") + f.write(f"Task ID: {task_id}\n") + f.write(f"Status: {status}\n") + f.write(f"Judge Result: {judge_result}\n") + if execution_time: + f.write(f"Execution Time: {execution_time:.2f} seconds\n") + if log_data.get("task_file_name"): + f.write(f"File Attachment: {log_data['task_file_name']}\n") + f.write("\n\n") + + # Question content + f.write("2. Question Content\n") + f.write("-" * 40 + "\n") + f.write(f"{question}\n\n\n") + + # Ground truth answer + f.write("3. Ground Truth Answer\n") + f.write("-" * 40 + "\n") + f.write(f"{ground_truth}\n\n\n") + + # Model answer + f.write("4. Model Answer\n") + f.write("-" * 40 + "\n") + f.write(f"{final_answer}\n\n\n") + + # Error information (if any) + if error: + f.write("5. Error Information\n") + f.write("-" * 40 + "\n") + f.write(f"{error}\n\n\n") + + # Metadata (if available) + if metadata_info: + f.write("6. Task Metadata\n") + f.write("-" * 40 + "\n") + for key, value in metadata_info.items(): + if isinstance(value, dict): + f.write(f"{key}:\n") + for sub_key, sub_value in value.items(): + f.write(f" {sub_key}: {sub_value}\n") + elif isinstance(value, list): + f.write(f"{key}: {', '.join(map(str, value))}\n") + else: + f.write(f"{key}: {value}\n") + f.write("\n\n") + + # Execution steps (if available) + if "step_logs" in log_data and log_data["step_logs"]: + f.write("7. Execution Steps\n") + f.write("-" * 40 + "\n") + f.write(f"Total steps: {len(log_data['step_logs'])}\n") + # Optionally include step details + f.write("\n") + + f.write("=" * 80 + "\n") + f.write("End of Report\n") + f.write("=" * 80 + "\n") + + print(f"📄 Task {task_id} report saved to: {report_path}") + return report_path + + +def main(): + """Main function""" + import argparse + + parser = argparse.ArgumentParser(description="Generate benchmark task reports from log files") + parser.add_argument( + "log_dir", + type=str, + help="Path to benchmark log directory", + ) + parser.add_argument( + "task_id", + type=str, + help="Task ID to generate report for", + ) + parser.add_argument( + "--attempt", + type=int, + default=1, + help="Attempt number (default: 1)", + ) + parser.add_argument( + "--output-dir", + type=str, + default=None, + help="Output directory for reports (default: /reports)", + ) + + args = parser.parse_args() + + generate_task_report_from_log( + args.log_dir, + args.task_id, + args.attempt, + args.output_dir + ) + + +if __name__ == "__main__": + main() +