diff --git a/src/claude_monitor/cli/main.py b/src/claude_monitor/cli/main.py index 3669423..c769338 100644 --- a/src/claude_monitor/cli/main.py +++ b/src/claude_monitor/cli/main.py @@ -168,6 +168,7 @@ def _run_monitoring(args: argparse.Namespace) -> None: args.refresh_rate if hasattr(args, "refresh_rate") else 10 ), data_path=str(data_path), + data_source=getattr(args, "data_source", "auto"), ) orchestrator.set_args(args) @@ -390,6 +391,7 @@ def _run_table_view( data_path=str(data_path), aggregation_mode=view_mode, timezone=args.timezone, + data_source=getattr(args, "data_source", "auto"), ) # Create table controller diff --git a/src/claude_monitor/core/settings.py b/src/claude_monitor/core/settings.py index 14aec1b..ec85290 100644 --- a/src/claude_monitor/core/settings.py +++ b/src/claude_monitor/core/settings.py @@ -34,6 +34,7 @@ def save(self, settings: "Settings") -> None: "refresh_rate": settings.refresh_rate, "reset_hour": settings.reset_hour, "view": settings.view, + "data_source": settings.data_source, "timestamp": datetime.now().isoformat(), } @@ -109,6 +110,11 @@ class Settings(BaseSettings): description="View mode (realtime, daily, monthly, session)", ) + data_source: Literal["auto", "claude", "opencode"] = Field( + default="auto", + description="Data source (auto, claude, opencode). Auto-detects available source.", + ) + @staticmethod def _get_system_timezone() -> str: """Lazy import to avoid circular dependencies.""" @@ -198,6 +204,20 @@ def validate_view(cls, v: Any) -> str: ) return v + @field_validator("data_source", mode="before") + @classmethod + def validate_data_source(cls, v: Any) -> str: + """Validate and normalize data source value.""" + if isinstance(v, str): + v_lower = v.lower() + valid_sources = ["auto", "claude", "opencode"] + if v_lower in valid_sources: + return v_lower + raise ValueError( + f"Invalid data source: {v}. Must be one of: {', '.join(valid_sources)}" + ) + return v + @field_validator("theme", mode="before") @classmethod def validate_theme(cls, v: Any) -> str: @@ -350,5 +370,6 @@ def to_namespace(self) -> argparse.Namespace: args.log_level = self.log_level args.log_file = str(self.log_file) if self.log_file else None args.version = self.version + args.data_source = self.data_source return args diff --git a/src/claude_monitor/data/__init__.py b/src/claude_monitor/data/__init__.py index c95972d..9a611f2 100644 --- a/src/claude_monitor/data/__init__.py +++ b/src/claude_monitor/data/__init__.py @@ -1,4 +1,24 @@ -"""Data package for Claude Monitor.""" +"""Data package for Claude Monitor. -# Import directly from modules without facade -__all__: list[str] = [] +Provides data loading from multiple sources: +- Claude Code (~/.claude/projects/*.jsonl) +- OpenCode (~/.local/share/opencode/storage/message/*.json) +""" + +from claude_monitor.data.reader import ( + DataSource, + detect_available_sources, + detect_data_source, + get_data_source_info, + load_usage_entries, + load_usage_entries_unified, +) + +__all__ = [ + "DataSource", + "detect_available_sources", + "detect_data_source", + "get_data_source_info", + "load_usage_entries", + "load_usage_entries_unified", +] diff --git a/src/claude_monitor/data/aggregator.py b/src/claude_monitor/data/aggregator.py index f353762..ba1b574 100644 --- a/src/claude_monitor/data/aggregator.py +++ b/src/claude_monitor/data/aggregator.py @@ -93,7 +93,11 @@ class UsageAggregator: """Aggregates usage data for daily and monthly reports.""" def __init__( - self, data_path: str, aggregation_mode: str = "daily", timezone: str = "UTC" + self, + data_path: str, + aggregation_mode: str = "daily", + timezone: str = "UTC", + data_source: str = "auto", ): """Initialize the aggregator. @@ -101,10 +105,12 @@ def __init__( data_path: Path to the data directory aggregation_mode: Mode of aggregation ('daily' or 'monthly') timezone: Timezone string for date formatting + data_source: Data source to use ("auto", "claude", "opencode") """ self.data_path = data_path self.aggregation_mode = aggregation_mode self.timezone = timezone + self.data_source = data_source self.timezone_handler = TimezoneHandler() def _aggregate_by_period( @@ -272,12 +278,25 @@ def aggregate(self) -> List[Dict[str, Any]]: Returns: List of aggregated data based on aggregation_mode """ - from claude_monitor.data.reader import load_usage_entries + from claude_monitor.data.reader import DataSource, load_usage_entries_unified logger.info(f"Starting aggregation in {self.aggregation_mode} mode") - # Load usage entries - entries, _ = load_usage_entries(data_path=self.data_path) + # Convert string source to DataSource enum + source_map = { + "auto": DataSource.AUTO, + "all": DataSource.ALL, + "claude": DataSource.CLAUDE, + "opencode": DataSource.OPENCODE, + } + source_enum = source_map.get(self.data_source.lower(), DataSource.AUTO) + + # Load usage entries from all available sources + entries, _, detected_source = load_usage_entries_unified( + data_path=self.data_path, + source=source_enum, + ) + logger.info(f"Loaded {len(entries)} entries from {detected_source.value}") if not entries: logger.warning("No usage entries found") diff --git a/src/claude_monitor/data/analysis.py b/src/claude_monitor/data/analysis.py index a7e144c..9bc5c33 100644 --- a/src/claude_monitor/data/analysis.py +++ b/src/claude_monitor/data/analysis.py @@ -10,7 +10,7 @@ from claude_monitor.core.calculations import BurnRateCalculator from claude_monitor.core.models import CostMode, SessionBlock, UsageEntry from claude_monitor.data.analyzer import SessionAnalyzer -from claude_monitor.data.reader import load_usage_entries +from claude_monitor.data.reader import DataSource, load_usage_entries_unified logger = logging.getLogger(__name__) @@ -20,6 +20,7 @@ def analyze_usage( use_cache: bool = True, quick_start: bool = False, data_path: Optional[str] = None, + data_source: str = "auto", ) -> Dict[str, Any]: """ Main entry point to generate response_final.json. @@ -35,13 +36,14 @@ def analyze_usage( use_cache: Use cached data when available quick_start: Use minimal data for quick startup (last 24h only) data_path: Optional path to Claude data directory + data_source: Data source to use ("auto", "claude", "opencode") Returns: Dictionary with analyzed blocks """ logger.info( f"analyze_usage called with hours_back={hours_back}, use_cache={use_cache}, " - f"quick_start={quick_start}, data_path={data_path}" + f"quick_start={quick_start}, data_path={data_path}, data_source={data_source}" ) if quick_start and hours_back is None: @@ -50,15 +52,32 @@ def analyze_usage( elif quick_start: logger.info(f"Quick start mode: loading last {hours_back} hours") + # Convert string source to DataSource enum + source_map = { + "auto": DataSource.AUTO, + "all": DataSource.ALL, + "claude": DataSource.CLAUDE, + "opencode": DataSource.OPENCODE, + } + data_source_lower = data_source.lower() + source_enum = source_map.get(data_source_lower) + if source_enum is None: + logger.warning(f"Unknown data_source '{data_source}', defaulting to 'auto'") + source_enum = DataSource.AUTO + start_time = datetime.now() - entries, raw_entries = load_usage_entries( + entries, raw_entries, detected_source = load_usage_entries_unified( data_path=data_path, hours_back=hours_back, mode=CostMode.AUTO, include_raw=True, + source=source_enum, ) load_time = (datetime.now() - start_time).total_seconds() - logger.info(f"Data loaded in {load_time:.3f}s") + logger.info( + f"Data loaded in {load_time:.3f}s from {detected_source.value} " + f"({len(entries)} entries)" + ) start_time = datetime.now() analyzer = SessionAnalyzer(session_duration_hours=5) @@ -93,6 +112,7 @@ def analyze_usage( "transform_time_seconds": transform_time, "cache_used": use_cache, "quick_start": quick_start, + "data_source": detected_source.value, } result = _create_result(blocks, entries, metadata) diff --git a/src/claude_monitor/data/opencode_reader.py b/src/claude_monitor/data/opencode_reader.py new file mode 100644 index 0000000..833dc8d --- /dev/null +++ b/src/claude_monitor/data/opencode_reader.py @@ -0,0 +1,263 @@ +"""OpenCode data reader for Claude Monitor. + +Reads usage data from OpenCode's storage format at ~/.local/share/opencode/storage/. +OpenCode stores data in a hierarchical JSON structure: +- sessions/{projectHash}/{sessionID}.json - Session metadata +- message/{sessionID}/{msgID}.json - Individual messages with token data +""" + +import json +import logging +from datetime import datetime, timedelta +from datetime import timezone as tz +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple + +from claude_monitor.core.models import CostMode, UsageEntry +from claude_monitor.core.pricing import PricingCalculator + +logger = logging.getLogger(__name__) + +# Default OpenCode storage path +OPENCODE_STORAGE_PATH = "~/.local/share/opencode/storage" +OPENCODE_MESSAGE_DIR = "message" + + +def load_opencode_entries( + data_path: Optional[str] = None, + hours_back: Optional[int] = None, + mode: CostMode = CostMode.AUTO, + include_raw: bool = False, +) -> Tuple[List[UsageEntry], Optional[List[Dict[str, Any]]]]: + """Load and convert OpenCode message files to UsageEntry objects. + + Args: + data_path: Path to OpenCode storage directory + (defaults to ~/.local/share/opencode/storage) + hours_back: Only include entries from last N hours + mode: Cost calculation mode + include_raw: Whether to return raw JSON data alongside entries + + Returns: + Tuple of (usage_entries, raw_data) where raw_data is None unless include_raw=True + """ + storage_path = Path(data_path if data_path else OPENCODE_STORAGE_PATH).expanduser() + message_path = storage_path / OPENCODE_MESSAGE_DIR + + if not message_path.exists(): + logger.warning("OpenCode message path does not exist: %s", message_path) + return [], None + + pricing_calculator = PricingCalculator() + + if hours_back: + cutoff_dt = datetime.now(tz.utc) - timedelta(hours=hours_back) + cutoff_ms = int(cutoff_dt.timestamp() * 1000) + else: + cutoff_ms = None + + # Find all message JSON files + message_files = _find_message_files(message_path) + if not message_files: + logger.warning("No message files found in %s", message_path) + return [], None + + all_entries: List[UsageEntry] = [] + raw_entries: Optional[List[Dict[str, Any]]] = [] if include_raw else None + processed_ids: Set[str] = set() + + for file_path in message_files: + entry, raw_data = _process_message_file( + file_path, + mode, + cutoff_ms, + processed_ids, + include_raw, + pricing_calculator, + ) + if entry: + all_entries.append(entry) + if include_raw and raw_data and raw_entries is not None: + raw_entries.append(raw_data) + + all_entries.sort(key=lambda e: e.timestamp) + + logger.info( + f"Processed {len(all_entries)} OpenCode entries from {len(message_files)} files" + ) + + return all_entries, raw_entries + + +def load_opencode_raw_entries( + data_path: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Load all raw OpenCode message entries without processing. + + Args: + data_path: Path to OpenCode storage directory + + Returns: + List of raw JSON dictionaries + """ + storage_path = Path(data_path if data_path else OPENCODE_STORAGE_PATH).expanduser() + message_path = storage_path / OPENCODE_MESSAGE_DIR + + if not message_path.exists(): + return [] + + message_files = _find_message_files(message_path) + all_raw_entries: List[Dict[str, Any]] = [] + + for file_path in message_files: + try: + with open(file_path, encoding="utf-8") as f: + data = json.load(f) + # Only include messages with token data (assistant messages) + if data.get("tokens"): + all_raw_entries.append(data) + except (json.JSONDecodeError, IOError) as e: + logger.debug(f"Failed to read {file_path}: {e}") + continue + + return all_raw_entries + + +def _find_message_files(message_path: Path) -> List[Path]: + """Find all message JSON files in the message directory. + + OpenCode stores messages in: message/{sessionID}/{msgID}.json + """ + if not message_path.exists(): + return [] + + # Find all .json files in session subdirectories + return list(message_path.rglob("msg_*.json")) + + +def _process_message_file( + file_path: Path, + mode: CostMode, + cutoff_ms: Optional[int], + processed_ids: Set[str], + include_raw: bool, + pricing_calculator: PricingCalculator, +) -> Tuple[Optional[UsageEntry], Optional[Dict[str, Any]]]: + """Process a single OpenCode message JSON file. + + Args: + file_path: Path to the message JSON file + mode: Cost calculation mode + cutoff_ms: Cutoff timestamp in milliseconds (None = no filter) + processed_ids: Set of already processed message IDs + include_raw: Whether to include raw data + pricing_calculator: Pricing calculator instance + + Returns: + Tuple of (UsageEntry or None, raw_data or None) + """ + try: + with open(file_path, encoding="utf-8") as f: + data = json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.debug(f"Failed to read message file {file_path}: {e}") + return None, None + + # Only process assistant messages with token data + if data.get("role") != "assistant": + return None, None + + tokens = data.get("tokens") + if not tokens: + return None, None + + # Check for duplicate + msg_id = data.get("id", "") + if not msg_id: + # Generate a unique ID from file path if missing + msg_id = str(file_path) + if msg_id in processed_ids: + return None, None + + # Check time filter + time_data = data.get("time", {}) + created_ms = time_data.get("created") + if created_ms and cutoff_ms and created_ms < cutoff_ms: + return None, None + + # Extract token counts + input_tokens = tokens.get("input", 0) + output_tokens = tokens.get("output", 0) + reasoning_tokens = tokens.get("reasoning", 0) + cache_data = tokens.get("cache", {}) + cache_read_tokens = cache_data.get("read", 0) + cache_write_tokens = cache_data.get("write", 0) + + # Skip if no meaningful token usage + total_tokens = ( + input_tokens + + output_tokens + + reasoning_tokens + + cache_read_tokens + + cache_write_tokens + ) + if total_tokens == 0: + return None, None + + # Parse timestamp + if created_ms: + timestamp = datetime.fromtimestamp(created_ms / 1000, tz=tz.utc) + else: + return None, None + + # Get model info + model_id = data.get("modelID", "unknown") + + # Calculate cost + # Note: OpenCode uses "cache.write" which maps to cache_creation_tokens + # and "cache.read" which maps to cache_read_tokens + cost_usd = pricing_calculator.calculate_cost( + model=model_id, + input_tokens=input_tokens, + output_tokens=output_tokens + reasoning_tokens, # Include reasoning in output + cache_creation_tokens=cache_write_tokens, + cache_read_tokens=cache_read_tokens, + ) + + # Mark as processed + processed_ids.add(msg_id) + + entry = UsageEntry( + timestamp=timestamp, + input_tokens=input_tokens, + output_tokens=output_tokens + reasoning_tokens, + cache_creation_tokens=cache_write_tokens, + cache_read_tokens=cache_read_tokens, + cost_usd=cost_usd, + model=model_id, + message_id=msg_id, + request_id=data.get("sessionID", "unknown"), + ) + + raw_data = data if include_raw else None + return entry, raw_data + + +def detect_opencode_installation() -> bool: + """Check if OpenCode data storage exists. + + Returns: + True if OpenCode storage directory exists + """ + storage_path = Path(OPENCODE_STORAGE_PATH).expanduser() + message_path = storage_path / OPENCODE_MESSAGE_DIR + return message_path.exists() + + +def get_opencode_storage_path() -> Path: + """Get the OpenCode storage path. + + Returns: + Path to OpenCode storage directory + """ + return Path(OPENCODE_STORAGE_PATH).expanduser() diff --git a/src/claude_monitor/data/reader.py b/src/claude_monitor/data/reader.py index 5aa8e18..6a970e9 100644 --- a/src/claude_monitor/data/reader.py +++ b/src/claude_monitor/data/reader.py @@ -1,13 +1,14 @@ """Simplified data reader for Claude Monitor. Combines functionality from file_reader, filter, mapper, and processor -into a single cohesive module. +into a single cohesive module. Supports both Claude Code and OpenCode data sources. """ import json import logging from datetime import datetime, timedelta from datetime import timezone as tz +from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple @@ -18,6 +19,7 @@ ) from claude_monitor.core.models import CostMode, UsageEntry from claude_monitor.core.pricing import PricingCalculator +from claude_monitor.data.opencode_reader import OPENCODE_STORAGE_PATH from claude_monitor.error_handling import report_file_error from claude_monitor.utils.time_utils import TimezoneHandler @@ -26,9 +28,21 @@ TOKEN_INPUT = "input_tokens" TOKEN_OUTPUT = "output_tokens" +# Default data path for Claude Code +CLAUDE_CODE_PATH = "~/.claude/projects" + logger = logging.getLogger(__name__) +class DataSource(Enum): + """Data source types for usage data.""" + + AUTO = "auto" # Auto-detect and combine all available sources + ALL = "all" # Explicitly use all available sources (same as AUTO) + CLAUDE = "claude" # Claude Code only (~/.claude/projects) + OPENCODE = "opencode" # OpenCode only (~/.local/share/opencode/storage) + + def load_usage_entries( data_path: Optional[str] = None, hours_back: Optional[int] = None, @@ -320,3 +334,191 @@ def _extract_metadata(self, data: Dict[str, Any]) -> Dict[str, str]: "message_id": data.get("message_id") or message.get("id", ""), "request_id": data.get("request_id") or data.get("requestId", "unknown"), } + + +# ============================================================================= +# Unified Data Loading with Multi-Source Support +# ============================================================================= + + +def detect_available_sources() -> List[DataSource]: + """Detect which data sources are available. + + Returns: + List of available DataSource enums + """ + available: List[DataSource] = [] + + # Check for Claude Code installation + claude_path = Path(CLAUDE_CODE_PATH).expanduser() + if claude_path.exists() and any(claude_path.rglob("*.jsonl")): + logger.info("Detected Claude Code data source") + available.append(DataSource.CLAUDE) + + # Check for OpenCode installation + opencode_path = Path(OPENCODE_STORAGE_PATH).expanduser() / "message" + if opencode_path.exists(): + try: + if any(opencode_path.iterdir()): + logger.info("Detected OpenCode data source") + available.append(DataSource.OPENCODE) + except (PermissionError, OSError): + pass + + return available + + +def detect_data_source() -> DataSource: + """Detect which data source is available (legacy compatibility). + + For backward compatibility, returns a single source. + Prefers OpenCode if available, otherwise Claude Code. + + Returns: + DataSource enum indicating the detected source + """ + available = detect_available_sources() + + if DataSource.OPENCODE in available: + return DataSource.OPENCODE + if DataSource.CLAUDE in available: + return DataSource.CLAUDE + + # Default to Claude Code path (legacy behavior) + logger.warning("No data source detected, defaulting to Claude Code") + return DataSource.CLAUDE + + +def load_usage_entries_unified( + data_path: Optional[str] = None, + hours_back: Optional[int] = None, + mode: CostMode = CostMode.AUTO, + include_raw: bool = False, + source: DataSource = DataSource.AUTO, +) -> Tuple[List[UsageEntry], Optional[List[Dict[str, Any]]], DataSource]: + """Load usage entries from Claude Code, OpenCode, or both. + + This is the unified entry point that supports multiple data sources + with automatic detection and merging. + + Args: + data_path: Optional custom data path (overrides auto-detection) + hours_back: Only include entries from last N hours + mode: Cost calculation mode + include_raw: Whether to return raw JSON data alongside entries + source: Data source to use (AUTO/ALL combines all available sources) + + Returns: + Tuple of (usage_entries, raw_data, primary_source) + When multiple sources are combined, primary_source indicates the first source used. + """ + from claude_monitor.data.opencode_reader import load_opencode_entries + + all_entries: List[UsageEntry] = [] + all_raw: Optional[List[Dict[str, Any]]] = [] if include_raw else None + primary_source: DataSource = DataSource.CLAUDE + + # Handle AUTO/ALL - combine all available sources + if source in (DataSource.AUTO, DataSource.ALL): + available = detect_available_sources() + + if not available: + logger.warning("No data sources available") + return [], None, DataSource.CLAUDE + + # Set primary source (first available) + primary_source = available[0] + + # Load from Claude Code if available + if DataSource.CLAUDE in available: + entries, raw_data = load_usage_entries( + data_path=data_path, + hours_back=hours_back, + mode=mode, + include_raw=include_raw, + ) + all_entries.extend(entries) + if include_raw and raw_data and all_raw is not None: + all_raw.extend(raw_data) + logger.info(f"Loaded {len(entries)} entries from Claude Code") + + # Load from OpenCode if available + # Note: OpenCode always uses its own default path (~/.local/share/opencode/storage) + # in AUTO mode. The data_path parameter only applies to Claude Code. + if DataSource.OPENCODE in available: + entries, raw_data = load_opencode_entries( + data_path=None, # OpenCode uses its own default path in AUTO mode + hours_back=hours_back, + mode=mode, + include_raw=include_raw, + ) + all_entries.extend(entries) + if include_raw and raw_data and all_raw is not None: + all_raw.extend(raw_data) + logger.info(f"Loaded {len(entries)} entries from OpenCode") + + # Handle single source selection + elif source == DataSource.OPENCODE: + entries, raw_data = load_opencode_entries( + data_path=data_path, + hours_back=hours_back, + mode=mode, + include_raw=include_raw, + ) + all_entries.extend(entries) + if include_raw and raw_data and all_raw is not None: + all_raw.extend(raw_data) + primary_source = DataSource.OPENCODE + + else: # DataSource.CLAUDE + entries, raw_data = load_usage_entries( + data_path=data_path, + hours_back=hours_back, + mode=mode, + include_raw=include_raw, + ) + all_entries.extend(entries) + if include_raw and raw_data and all_raw is not None: + all_raw.extend(raw_data) + primary_source = DataSource.CLAUDE + + # Sort combined entries by timestamp + all_entries.sort(key=lambda e: e.timestamp) + + logger.info(f"Total entries loaded: {len(all_entries)}") + return all_entries, all_raw, primary_source + + +def get_data_source_info(source: DataSource) -> Dict[str, Any]: + """Get information about a data source. + + Args: + source: The data source to get info for + + Returns: + Dictionary with path, exists, and description + """ + if source in (DataSource.AUTO, DataSource.ALL): + available = detect_available_sources() + return { + "path": "multiple", + "exists": len(available) > 0, + "description": f"Auto-detected sources: {[s.value for s in available]}", + "source": "auto", + } + if source == DataSource.OPENCODE: + path = Path(OPENCODE_STORAGE_PATH).expanduser() + return { + "path": str(path), + "exists": (path / "message").exists(), + "description": "OpenCode (~/.local/share/opencode/storage)", + "source": "opencode", + } + else: + path = Path(CLAUDE_CODE_PATH).expanduser() + return { + "path": str(path), + "exists": path.exists(), + "description": "Claude Code (~/.claude/projects)", + "source": "claude", + } diff --git a/src/claude_monitor/monitoring/data_manager.py b/src/claude_monitor/monitoring/data_manager.py index 0a1a7dd..13200c3 100644 --- a/src/claude_monitor/monitoring/data_manager.py +++ b/src/claude_monitor/monitoring/data_manager.py @@ -18,6 +18,7 @@ def __init__( cache_ttl: int = 30, hours_back: int = 192, data_path: Optional[str] = None, + data_source: str = "auto", ) -> None: """Initialize data manager with cache and fetch settings. @@ -25,6 +26,7 @@ def __init__( cache_ttl: Cache time-to-live in seconds hours_back: Hours of historical data to fetch data_path: Path to data directory + data_source: Data source to use ("auto", "claude", "opencode") """ self.cache_ttl: int = cache_ttl self._cache: Optional[Dict[str, Any]] = None @@ -32,6 +34,7 @@ def __init__( self.hours_back: int = hours_back self.data_path: Optional[str] = data_path + self.data_source: str = data_source self._last_error: Optional[str] = None self._last_successful_fetch: Optional[float] = None @@ -60,6 +63,7 @@ def get_data(self, force_refresh: bool = False) -> Optional[Dict[str, Any]]: quick_start=False, use_cache=False, data_path=self.data_path, + data_source=self.data_source, ) if data is not None: diff --git a/src/claude_monitor/monitoring/orchestrator.py b/src/claude_monitor/monitoring/orchestrator.py index ea70fd8..309c3ff 100644 --- a/src/claude_monitor/monitoring/orchestrator.py +++ b/src/claude_monitor/monitoring/orchestrator.py @@ -17,17 +17,23 @@ class MonitoringOrchestrator: """Orchestrates monitoring components following SRP.""" def __init__( - self, update_interval: int = 10, data_path: Optional[str] = None + self, + update_interval: int = 10, + data_path: Optional[str] = None, + data_source: str = "auto", ) -> None: """Initialize orchestrator with components. Args: update_interval: Seconds between updates data_path: Optional path to Claude data directory + data_source: Data source to use ("auto", "claude", "opencode") """ self.update_interval: int = update_interval - self.data_manager: DataManager = DataManager(cache_ttl=5, data_path=data_path) + self.data_manager: DataManager = DataManager( + cache_ttl=5, data_path=data_path, data_source=data_source + ) self.session_monitor: SessionMonitor = SessionMonitor() self._monitoring: bool = False diff --git a/src/tests/test_analysis.py b/src/tests/test_analysis.py index 42d1d50..fffe8b3 100644 --- a/src/tests/test_analysis.py +++ b/src/tests/test_analysis.py @@ -27,13 +27,15 @@ class TestAnalyzeUsage: """Test the main analyze_usage function.""" - @patch("claude_monitor.data.analysis.load_usage_entries") + @patch("claude_monitor.data.analysis.load_usage_entries_unified") @patch("claude_monitor.data.analysis.SessionAnalyzer") @patch("claude_monitor.data.analysis.BurnRateCalculator") def test_analyze_usage_basic( self, mock_calc_class: Mock, mock_analyzer_class: Mock, mock_load: Mock ) -> None: """Test basic analyze_usage functionality.""" + from claude_monitor.data.reader import DataSource + sample_entry = UsageEntry( timestamp=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc), input_tokens=100, @@ -51,7 +53,7 @@ def test_analyze_usage_basic( entries=[sample_entry], ) - mock_load.return_value = ([sample_entry], [{"raw": "data"}]) + mock_load.return_value = ([sample_entry], [{"raw": "data"}], DataSource.CLAUDE) mock_analyzer = Mock() mock_analyzer.transform_to_blocks.return_value = [sample_block] @@ -71,17 +73,22 @@ def test_analyze_usage_basic( assert result["total_tokens"] == 150 assert result["total_cost"] == 0.001 mock_load.assert_called_once() + call_kwargs = mock_load.call_args[1] + assert call_kwargs["hours_back"] == 24 + assert call_kwargs["source"] == DataSource.AUTO mock_analyzer.transform_to_blocks.assert_called_once_with([sample_entry]) mock_analyzer.detect_limits.assert_called_once_with([{"raw": "data"}]) - @patch("claude_monitor.data.analysis.load_usage_entries") + @patch("claude_monitor.data.analysis.load_usage_entries_unified") @patch("claude_monitor.data.analysis.SessionAnalyzer") @patch("claude_monitor.data.analysis.BurnRateCalculator") def test_analyze_usage_quick_start_no_hours( self, mock_calc_class: Mock, mock_analyzer_class: Mock, mock_load: Mock ) -> None: """Test analyze_usage with quick_start=True and hours_back=None.""" - mock_load.return_value = ([], []) + from claude_monitor.data.reader import DataSource + + mock_load.return_value = ([], [], DataSource.CLAUDE) mock_analyzer = Mock() mock_analyzer.transform_to_blocks.return_value = [] mock_analyzer.detect_limits.return_value = [] @@ -89,21 +96,24 @@ def test_analyze_usage_quick_start_no_hours( mock_calc_class.return_value = Mock() result = analyze_usage(quick_start=True, hours_back=None) - mock_load.assert_called_once_with( - data_path=None, hours_back=24, mode=CostMode.AUTO, include_raw=True - ) + mock_load.assert_called_once() + call_kwargs = mock_load.call_args[1] + assert call_kwargs["hours_back"] == 24 # Quick start defaults to 24 hours + assert call_kwargs["source"] == DataSource.AUTO assert result["metadata"]["quick_start"] is True assert result["metadata"]["hours_analyzed"] == 24 - @patch("claude_monitor.data.analysis.load_usage_entries") + @patch("claude_monitor.data.analysis.load_usage_entries_unified") @patch("claude_monitor.data.analysis.SessionAnalyzer") @patch("claude_monitor.data.analysis.BurnRateCalculator") def test_analyze_usage_quick_start_with_hours( self, mock_calc_class: Mock, mock_analyzer_class: Mock, mock_load: Mock ) -> None: """Test analyze_usage with quick_start=True and specific hours_back.""" - mock_load.return_value = ([], []) + from claude_monitor.data.reader import DataSource + + mock_load.return_value = ([], [], DataSource.CLAUDE) mock_analyzer = Mock() mock_analyzer.transform_to_blocks.return_value = [] mock_analyzer.detect_limits.return_value = [] @@ -111,20 +121,23 @@ def test_analyze_usage_quick_start_with_hours( mock_calc_class.return_value = Mock() result = analyze_usage(quick_start=True, hours_back=48) - mock_load.assert_called_once_with( - data_path=None, hours_back=48, mode=CostMode.AUTO, include_raw=True - ) + mock_load.assert_called_once() + call_kwargs = mock_load.call_args[1] + assert call_kwargs["hours_back"] == 48 # User-specified hours preserved + assert call_kwargs["source"] == DataSource.AUTO assert result["metadata"]["quick_start"] is True assert result["metadata"]["hours_analyzed"] == 48 - @patch("claude_monitor.data.analysis.load_usage_entries") + @patch("claude_monitor.data.analysis.load_usage_entries_unified") @patch("claude_monitor.data.analysis.SessionAnalyzer") @patch("claude_monitor.data.analysis.BurnRateCalculator") def test_analyze_usage_with_limits( self, mock_calc_class: Mock, mock_analyzer_class: Mock, mock_load: Mock ) -> None: """Test analyze_usage with limit detection.""" + from claude_monitor.data.reader import DataSource + sample_entry = UsageEntry( timestamp=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc), input_tokens=100, @@ -149,7 +162,7 @@ def test_analyze_usage_with_limits( "reset_time": datetime(2024, 1, 1, 14, 0, tzinfo=timezone.utc), } - mock_load.return_value = ([sample_entry], [{"raw": "data"}]) + mock_load.return_value = ([sample_entry], [{"raw": "data"}], DataSource.CLAUDE) mock_analyzer = Mock() mock_analyzer.transform_to_blocks.return_value = [sample_block] @@ -163,13 +176,15 @@ def test_analyze_usage_with_limits( assert result["metadata"]["limits_detected"] == 1 assert hasattr(sample_block, "limit_messages") - @patch("claude_monitor.data.analysis.load_usage_entries") + @patch("claude_monitor.data.analysis.load_usage_entries_unified") @patch("claude_monitor.data.analysis.SessionAnalyzer") @patch("claude_monitor.data.analysis.BurnRateCalculator") def test_analyze_usage_no_raw_entries( self, mock_calc_class: Mock, mock_analyzer_class: Mock, mock_load: Mock ) -> None: """Test analyze_usage when no raw entries are provided.""" + from claude_monitor.data.reader import DataSource + sample_entry = UsageEntry( timestamp=datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc), input_tokens=100, @@ -187,7 +202,7 @@ def test_analyze_usage_no_raw_entries( entries=[sample_entry], ) - mock_load.return_value = ([sample_entry], None) + mock_load.return_value = ([sample_entry], None, DataSource.CLAUDE) mock_analyzer = Mock() mock_analyzer.transform_to_blocks.return_value = [sample_block] diff --git a/src/tests/test_monitoring_orchestrator.py b/src/tests/test_monitoring_orchestrator.py index 9cb8ed9..c7dc055 100644 --- a/src/tests/test_monitoring_orchestrator.py +++ b/src/tests/test_monitoring_orchestrator.py @@ -75,7 +75,9 @@ def test_init_with_defaults(self) -> None: assert orchestrator._last_valid_data is None assert len(orchestrator._update_callbacks) == 0 - mock_dm.assert_called_once_with(cache_ttl=5, data_path=None) + mock_dm.assert_called_once_with( + cache_ttl=5, data_path=None, data_source="auto" + ) mock_sm.assert_called_once() def test_init_with_custom_params(self) -> None: @@ -89,7 +91,9 @@ def test_init_with_custom_params(self) -> None: ) assert orchestrator.update_interval == 5 - mock_dm.assert_called_once_with(cache_ttl=5, data_path="/custom/path") + mock_dm.assert_called_once_with( + cache_ttl=5, data_path="/custom/path", data_source="auto" + ) class TestMonitoringOrchestratorLifecycle: diff --git a/src/tests/test_opencode_reader.py b/src/tests/test_opencode_reader.py new file mode 100644 index 0000000..cd3273b --- /dev/null +++ b/src/tests/test_opencode_reader.py @@ -0,0 +1,369 @@ +"""Tests for OpenCode data reader module.""" + +import json +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict +from unittest.mock import patch + +import pytest + +from claude_monitor.core.models import CostMode +from claude_monitor.data.opencode_reader import ( + OPENCODE_STORAGE_PATH, + _find_message_files, + _process_message_file, + detect_opencode_installation, + get_opencode_storage_path, + load_opencode_entries, + load_opencode_raw_entries, +) + + +class TestOpenCodeDetection: + """Tests for OpenCode installation detection.""" + + def test_detect_opencode_installation_exists(self, tmp_path: Path) -> None: + """Test detection when OpenCode storage exists.""" + message_dir = tmp_path / "message" + message_dir.mkdir(parents=True) + (message_dir / "ses_test").mkdir() + + # Patch OPENCODE_STORAGE_PATH to use the tmp_path directly (no ~ expansion needed) + with patch( + "claude_monitor.data.opencode_reader.OPENCODE_STORAGE_PATH", + str(tmp_path), + ): + result = detect_opencode_installation() + assert result is True, "Should detect OpenCode when message dir exists" + + def test_detect_opencode_installation_not_exists(self) -> None: + """Test detection when OpenCode storage doesn't exist.""" + with patch( + "claude_monitor.data.opencode_reader.OPENCODE_STORAGE_PATH", + "/nonexistent/path", + ): + result = detect_opencode_installation() + assert result is False + + def test_get_opencode_storage_path(self) -> None: + """Test getting OpenCode storage path.""" + path = get_opencode_storage_path() + assert isinstance(path, Path) + assert "opencode" in str(path) + + +class TestFindMessageFiles: + """Tests for finding message files.""" + + def test_find_message_files_empty_directory(self, tmp_path: Path) -> None: + """Test finding files in empty directory.""" + result = _find_message_files(tmp_path) + assert result == [] + + def test_find_message_files_nonexistent(self) -> None: + """Test finding files in nonexistent directory.""" + result = _find_message_files(Path("/nonexistent")) + assert result == [] + + def test_find_message_files_with_messages(self, tmp_path: Path) -> None: + """Test finding message files in valid structure.""" + # Create session directory with message files + session_dir = tmp_path / "ses_abc123" + session_dir.mkdir() + (session_dir / "msg_001.json").write_text("{}") + (session_dir / "msg_002.json").write_text("{}") + (session_dir / "other.txt").write_text("not a message") + + result = _find_message_files(tmp_path) + assert len(result) == 2 + assert all(f.name.startswith("msg_") for f in result) + + +class TestProcessMessageFile: + """Tests for processing individual message files.""" + + @pytest.fixture + def sample_message_data(self) -> Dict[str, Any]: + """Create sample OpenCode message data.""" + return { + "id": "msg_test123", + "sessionID": "ses_abc", + "role": "assistant", + "time": { + "created": 1766649366187, + "completed": 1766649369986, + }, + "modelID": "claude-opus-4-5", + "providerID": "anthropic", + "tokens": { + "input": 100, + "output": 500, + "reasoning": 50, + "cache": {"read": 1000, "write": 200}, + }, + "finish": "end_turn", + } + + def test_process_valid_message( + self, tmp_path: Path, sample_message_data: Dict[str, Any] + ) -> None: + """Test processing a valid assistant message.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_file = tmp_path / "msg_test.json" + msg_file.write_text(json.dumps(sample_message_data)) + + pricing = PricingCalculator() + processed_ids: set = set() + + entry, raw = _process_message_file( + msg_file, + CostMode.AUTO, + None, + processed_ids, + True, + pricing, + ) + + assert entry is not None + assert entry.input_tokens == 100 + assert entry.output_tokens == 550 # output + reasoning + assert entry.cache_read_tokens == 1000 + assert entry.cache_creation_tokens == 200 + assert entry.model == "claude-opus-4-5" + assert entry.message_id == "msg_test123" + assert raw is not None + + def test_process_user_message_skipped(self, tmp_path: Path) -> None: + """Test that user messages are skipped.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_data = {"id": "msg_user", "role": "user", "tokens": {"input": 100}} + msg_file = tmp_path / "msg_user.json" + msg_file.write_text(json.dumps(msg_data)) + + pricing = PricingCalculator() + entry, _ = _process_message_file( + msg_file, + CostMode.AUTO, + None, + set(), + False, + pricing, + ) + + assert entry is None + + def test_process_message_without_tokens_skipped(self, tmp_path: Path) -> None: + """Test that messages without tokens are skipped.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_data = {"id": "msg_no_tokens", "role": "assistant"} + msg_file = tmp_path / "msg_no_tokens.json" + msg_file.write_text(json.dumps(msg_data)) + + pricing = PricingCalculator() + entry, _ = _process_message_file( + msg_file, + CostMode.AUTO, + None, + set(), + False, + pricing, + ) + + assert entry is None + + def test_process_duplicate_message_skipped( + self, tmp_path: Path, sample_message_data: Dict[str, Any] + ) -> None: + """Test that duplicate messages are skipped.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_file = tmp_path / "msg_dup.json" + msg_file.write_text(json.dumps(sample_message_data)) + + pricing = PricingCalculator() + processed_ids = {"msg_test123"} # Already processed + + entry, _ = _process_message_file( + msg_file, + CostMode.AUTO, + None, + processed_ids, + False, + pricing, + ) + + assert entry is None + + def test_process_old_message_filtered_by_cutoff( + self, tmp_path: Path, sample_message_data: Dict[str, Any] + ) -> None: + """Test that old messages are filtered by cutoff time.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_file = tmp_path / "msg_old.json" + msg_file.write_text(json.dumps(sample_message_data)) + + pricing = PricingCalculator() + # Set cutoff to future + future_cutoff_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + 1000000 + + entry, _ = _process_message_file( + msg_file, + CostMode.AUTO, + future_cutoff_ms, + set(), + False, + pricing, + ) + + assert entry is None + + def test_process_invalid_json_handled(self, tmp_path: Path) -> None: + """Test that invalid JSON is handled gracefully.""" + from claude_monitor.core.pricing import PricingCalculator + + msg_file = tmp_path / "msg_invalid.json" + msg_file.write_text("not valid json") + + pricing = PricingCalculator() + entry, _ = _process_message_file( + msg_file, + CostMode.AUTO, + None, + set(), + False, + pricing, + ) + + assert entry is None + + +class TestLoadOpenCodeEntries: + """Tests for loading OpenCode entries.""" + + def test_load_from_nonexistent_path(self) -> None: + """Test loading from nonexistent path returns empty list.""" + entries, raw = load_opencode_entries( + data_path="/nonexistent/path", + hours_back=24, + ) + assert entries == [] + assert raw is None + + def test_load_with_real_structure(self, tmp_path: Path) -> None: + """Test loading from realistic directory structure.""" + # Create message directory structure + storage = tmp_path / "storage" + message_dir = storage / "message" + session_dir = message_dir / "ses_test123" + session_dir.mkdir(parents=True) + + # Create sample message files + msg1 = { + "id": "msg_001", + "sessionID": "ses_test123", + "role": "assistant", + "time": {"created": int(datetime.now(timezone.utc).timestamp() * 1000)}, + "modelID": "claude-sonnet-4-5", + "tokens": { + "input": 10, + "output": 100, + "reasoning": 0, + "cache": {"read": 0, "write": 0}, + }, + } + msg2 = { + "id": "msg_002", + "sessionID": "ses_test123", + "role": "assistant", + "time": {"created": int(datetime.now(timezone.utc).timestamp() * 1000)}, + "modelID": "claude-sonnet-4-5", + "tokens": { + "input": 20, + "output": 200, + "reasoning": 0, + "cache": {"read": 50, "write": 10}, + }, + } + + (session_dir / "msg_001.json").write_text(json.dumps(msg1)) + (session_dir / "msg_002.json").write_text(json.dumps(msg2)) + + entries, raw = load_opencode_entries( + data_path=str(storage), + hours_back=24, + include_raw=True, + ) + + assert len(entries) == 2 + assert raw is not None + assert len(raw) == 2 + + # Verify entries are sorted by timestamp + assert entries[0].timestamp <= entries[1].timestamp + + +class TestLoadOpenCodeRawEntries: + """Tests for loading raw OpenCode entries.""" + + def test_load_raw_from_nonexistent_path(self) -> None: + """Test loading raw from nonexistent path returns empty list.""" + entries = load_opencode_raw_entries(data_path="/nonexistent/path") + assert entries == [] + + def test_load_raw_filters_non_token_messages(self, tmp_path: Path) -> None: + """Test that messages without tokens are filtered.""" + storage = tmp_path / "storage" + message_dir = storage / "message" + session_dir = message_dir / "ses_test" + session_dir.mkdir(parents=True) + + # Message with tokens + msg_with_tokens = { + "id": "msg_001", + "role": "assistant", + "tokens": {"input": 10, "output": 100}, + } + # Message without tokens + msg_no_tokens = {"id": "msg_002", "role": "user"} + + (session_dir / "msg_001.json").write_text(json.dumps(msg_with_tokens)) + (session_dir / "msg_002.json").write_text(json.dumps(msg_no_tokens)) + + entries = load_opencode_raw_entries(data_path=str(storage)) + + assert len(entries) == 1 + assert entries[0]["id"] == "msg_001" + + +class TestDataSourceIntegration: + """Tests for data source detection and unified loading.""" + + def test_data_source_enum_values(self) -> None: + """Test DataSource enum has expected values.""" + from claude_monitor.data.reader import DataSource + + assert DataSource.AUTO.value == "auto" + assert DataSource.CLAUDE.value == "claude" + assert DataSource.OPENCODE.value == "opencode" + + def test_get_data_source_info_opencode(self) -> None: + """Test getting info for OpenCode source.""" + from claude_monitor.data.reader import DataSource, get_data_source_info + + info = get_data_source_info(DataSource.OPENCODE) + assert info["source"] == "opencode" + assert "opencode" in info["description"].lower() + + def test_get_data_source_info_claude(self) -> None: + """Test getting info for Claude source.""" + from claude_monitor.data.reader import DataSource, get_data_source_info + + info = get_data_source_info(DataSource.CLAUDE) + assert info["source"] == "claude" + assert "claude" in info["description"].lower() diff --git a/src/tests/test_settings.py b/src/tests/test_settings.py index af6d534..469a2cc 100644 --- a/src/tests/test_settings.py +++ b/src/tests/test_settings.py @@ -55,6 +55,7 @@ def test_save_success(self) -> None: "reset_hour": 12, "custom_limit_tokens": 1000, "view": "realtime", + "data_source": "auto", }, )() @@ -76,6 +77,7 @@ def test_save_success(self) -> None: assert data["reset_hour"] == 12 assert data["custom_limit_tokens"] == 1000 assert data["view"] == "realtime" + assert data["data_source"] == "auto" assert "timestamp" in data def test_save_without_custom_limit(self) -> None: @@ -92,6 +94,7 @@ def test_save_without_custom_limit(self) -> None: "reset_hour": None, "custom_limit_tokens": None, "view": "realtime", + "data_source": "auto", }, )() @@ -121,6 +124,7 @@ def test_save_creates_directory(self) -> None: "reset_hour": 12, "custom_limit_tokens": None, "view": "realtime", + "data_source": "auto", }, )() @@ -143,6 +147,7 @@ def test_save_error_handling(self, mock_logger: Mock) -> None: mock_settings.reset_hour = 12 mock_settings.custom_limit_tokens = None mock_settings.view = "realtime" + mock_settings.data_source = "auto" # Should not raise exception self.last_used.save(mock_settings)