diff --git a/src/claude_monitor/cli/main.py b/src/claude_monitor/cli/main.py index 3669423..1ba37d3 100644 --- a/src/claude_monitor/cli/main.py +++ b/src/claude_monitor/cli/main.py @@ -385,6 +385,38 @@ def _run_table_view( logger = logging.getLogger(__name__) try: + # Parse date filters early so they can be used for both current and historical data + from datetime import datetime + + from claude_monitor.utils.time_utils import TimezoneHandler + + tz = TimezoneHandler(args.timezone) + + def _parse_date(date_str: Optional[str]): + if not date_str: + return None + for fmt in ("%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d"): + try: + return tz.ensure_timezone(datetime.strptime(date_str, fmt)) + except ValueError: + continue + print_themed( + f"Invalid date format: {date_str}. Use one of: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD.", + style="warning", + ) + return None + + start_dt = _parse_date(getattr(args, "start_date", None)) + end_dt = _parse_date(getattr(args, "end_date", None)) + + # Validate date range + if start_dt and end_dt and start_dt > end_dt: + print_themed( + f"Error: start_date ({getattr(args, 'start_date', None)}) must be on or before end_date ({getattr(args, 'end_date', None)})", + style="error", + ) + return + # Create aggregator with appropriate mode aggregator = UsageAggregator( data_path=str(data_path), @@ -395,9 +427,130 @@ def _run_table_view( # Create table controller controller = TableViewsController(console=console) - # Get aggregated data + # Get aggregated data with date filters + # Note: end_date is treated as whole-day inclusive by the aggregator. + # Do not modify end_dt to 23:59:59 here; the aggregator excludes + # entries >= next day's midnight in the selected timezone. logger.info(f"Loading {view_mode} usage data...") - aggregated_data = aggregator.aggregate() + aggregated_data = aggregator.aggregate(start_dt, end_dt) + + # Initialize history manager for daily and monthly views + history_mode = getattr(args, "history", "auto") + if history_mode != "off": + from claude_monitor.data.history_manager import HistoryManager + + history_manager = HistoryManager() + + if view_mode == "daily": + # Load historical data if in auto or readonly mode + if history_mode in ["auto", "readonly"]: + # Load historical data using the same date filters + historical_data = history_manager.load_historical_daily_data( + start_date=start_dt, + end_date=end_dt, # history_manager uses inclusive dates + ) + + if historical_data: + print_themed( + f"Loaded {len(historical_data)} days from history", + style="info", + ) + + # Merge with current data + aggregated_data = history_manager.merge_with_current_data( + aggregated_data, historical_data + ) + print_themed( + f"Displaying {len(aggregated_data)} total days", + style="info", + ) + + # Save current data to history if in auto or writeonly mode + if aggregated_data and history_mode in ["auto", "writeonly"]: + saved_count = history_manager.save_daily_data(aggregated_data) + if saved_count > 0: + print_themed( + f"Saved {saved_count} days to history", style="success" + ) + + elif view_mode == "monthly": + # For monthly view, always compute current daily data first + # (aggregator uses whole-day inclusive end-date semantics) + daily_aggregator = UsageAggregator( + data_path=str(data_path), + aggregation_mode="daily", + timezone=args.timezone, + ) + current_daily = daily_aggregator.aggregate(start_dt, end_dt) + + # Load historical daily data only for auto/readonly modes + daily_historical: List[Dict[str, Any]] = [] + if history_mode in ["auto", "readonly"]: + daily_historical = history_manager.load_historical_daily_data( + start_date=start_dt, + end_date=end_dt, # history_manager uses inclusive dates + ) + + # Save current daily data to history in auto or writeonly mode + if current_daily and history_mode in ["auto", "writeonly"]: + saved = history_manager.save_daily_data(current_daily) + if saved > 0: + print_themed( + f"Saved {saved} days to history", style="success" + ) + + # Merge current and historical daily data + all_daily: List[Dict[str, Any]] = [] + if current_daily and daily_historical: + all_daily = history_manager.merge_with_current_data( + current_daily, daily_historical + ) + # Show data source composition + current_dates = {d.get("date") for d in current_daily} + historical_dates = {d.get("date") for d in daily_historical} + from_current = len(current_dates) + from_history_only = len(historical_dates - current_dates) + + if from_history_only > 0: + print_themed( + f"Loaded {len(all_daily)} days total ({from_current} from current session, {from_history_only} from history)", + style="info", + ) + else: + print_themed( + f"Loaded {len(all_daily)} days from current session", + style="info", + ) + elif current_daily: + all_daily = current_daily + print_themed( + f"Using {len(current_daily)} current days", style="info" + ) + elif daily_historical: + all_daily = daily_historical + print_themed( + f"Using {len(daily_historical)} historical days", + style="info", + ) + + # Always aggregate daily data into monthly + if all_daily: + monthly_from_daily = history_manager.aggregate_monthly_from_daily( + all_daily + ) + + if monthly_from_daily: + # Replace the initial aggregated_data with the one from daily + aggregated_data = monthly_from_daily + print_themed( + f"Displaying {len(aggregated_data)} months aggregated from {len(all_daily)} days", + style="info", + ) + else: + print_themed( + "No monthly data could be aggregated from daily data", + style="warning", + ) if not aggregated_data: print_themed(f"No usage data found for {view_mode} view", style="warning") diff --git a/src/claude_monitor/core/settings.py b/src/claude_monitor/core/settings.py index 14aec1b..5e42735 100644 --- a/src/claude_monitor/core/settings.py +++ b/src/claude_monitor/core/settings.py @@ -138,6 +138,16 @@ def _get_system_time_format() -> str: description="Display theme (light, dark, classic, auto)", ) + start_date: Optional[str] = Field( + default=None, + description="Start date for filtering data (formats: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD)", + ) + + end_date: Optional[str] = Field( + default=None, + description="End date for filtering data (formats: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD)", + ) + custom_limit_tokens: Optional[int] = Field( default=None, gt=0, description="Token limit for custom plan" ) @@ -170,6 +180,11 @@ def _get_system_time_format() -> str: clear: bool = Field(default=False, description="Clear saved configuration") + history: Literal["auto", "off", "readonly", "writeonly"] = Field( + default="auto", + description="History mode: auto (save+load), off (disable), readonly (load only), writeonly (save only)", + ) + @field_validator("plan", mode="before") @classmethod def validate_plan(cls, v: Any) -> str: @@ -240,6 +255,20 @@ def validate_log_level(cls, v: str) -> str: raise ValueError(f"Invalid log level: {v}") return v_upper + @field_validator("history", mode="before") + @classmethod + def validate_history(cls, v: Any) -> str: + """Validate and normalize history mode value.""" + if isinstance(v, str): + v_lower = v.lower() + valid_modes = ["auto", "off", "readonly", "writeonly"] + if v_lower in valid_modes: + return v_lower + raise ValueError( + f"Invalid history mode: {v}. Must be one of: {', '.join(valid_modes)}" + ) + return v + @classmethod def settings_customise_sources( cls, @@ -350,5 +379,8 @@ def to_namespace(self) -> argparse.Namespace: args.log_level = self.log_level args.log_file = str(self.log_file) if self.log_file else None args.version = self.version + args.start_date = self.start_date + args.end_date = self.end_date + args.history = self.history return args diff --git a/src/claude_monitor/data/aggregator.py b/src/claude_monitor/data/aggregator.py index f353762..654cc04 100644 --- a/src/claude_monitor/data/aggregator.py +++ b/src/claude_monitor/data/aggregator.py @@ -7,7 +7,7 @@ import logging from collections import defaultdict from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Callable, Dict, List, Optional from claude_monitor.core.models import SessionBlock, UsageEntry, normalize_model_name @@ -105,7 +105,9 @@ def __init__( self.data_path = data_path self.aggregation_mode = aggregation_mode self.timezone = timezone - self.timezone_handler = TimezoneHandler() + # Initialize handler with the user-selected timezone so subsequent + # conversions and localizations use it consistently. + self.timezone_handler = TimezoneHandler(timezone) def _aggregate_by_period( self, @@ -121,23 +123,50 @@ def _aggregate_by_period( entries: List of usage entries period_key_func: Function to extract period key from timestamp period_type: Type of period ('date' or 'month') - start_date: Optional start date filter - end_date: Optional end date filter + start_date: Optional start date filter (inclusive) + end_date: Optional end date filter (inclusive by full day in the + configured timezone; implemented by excluding entries that are + on or after next day's midnight) Returns: List of aggregated data dictionaries + + Note: + Both start_date and end_date are inclusive. If end_date is provided, + all entries from that entire day are included (up to 23:59:59.999999). """ period_data: Dict[str, AggregatedPeriod] = {} + # Normalize filter boundaries into the configured timezone for + # consistent, intuitive "whole-day inclusive" semantics. + norm_start = ( + self.timezone_handler.to_timezone(start_date, self.timezone) + if start_date + else None + ) + norm_end = ( + self.timezone_handler.to_timezone(end_date, self.timezone) + if end_date + else None + ) + for entry in entries: - # Apply date filters - if start_date and entry.timestamp < start_date: - continue - if end_date and entry.timestamp > end_date: + # Convert entry timestamp to the configured timezone for filtering + # and period-key extraction. + ts_local = self.timezone_handler.to_timezone(entry.timestamp, self.timezone) + + # Apply date filters (inclusive boundaries in local timezone) + if norm_start and ts_local < norm_start: continue + # For end_date, include all entries up to the end of that day. + # Exclude entries >= next day's midnight in local timezone. + if norm_end: + next_day = norm_end + timedelta(days=1) + if ts_local >= next_day: + continue - # Get period key - period_key = period_key_func(entry.timestamp) + # Get period key using local time + period_key = period_key_func(ts_local) # Get or create period aggregate if period_key not in period_data: @@ -266,9 +295,15 @@ def calculate_totals(self, aggregated_data: List[Dict[str, Any]]) -> Dict[str, A "entries_count": total_stats.count, } - def aggregate(self) -> List[Dict[str, Any]]: + def aggregate( + self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None + ) -> List[Dict[str, Any]]: """Main aggregation method that reads data and returns aggregated results. + Args: + start_date: Optional start date filter + end_date: Optional end date filter + Returns: List of aggregated data based on aggregation_mode """ @@ -288,10 +323,10 @@ def aggregate(self) -> List[Dict[str, Any]]: if entry.timestamp.tzinfo is None: entry.timestamp = self.timezone_handler.ensure_timezone(entry.timestamp) - # Aggregate based on mode + # Aggregate based on mode with date filters if self.aggregation_mode == "daily": - return self.aggregate_daily(entries) + return self.aggregate_daily(entries, start_date, end_date) elif self.aggregation_mode == "monthly": - return self.aggregate_monthly(entries) + return self.aggregate_monthly(entries, start_date, end_date) else: raise ValueError(f"Invalid aggregation mode: {self.aggregation_mode}") diff --git a/src/claude_monitor/data/history_manager.py b/src/claude_monitor/data/history_manager.py new file mode 100644 index 0000000..add5374 --- /dev/null +++ b/src/claude_monitor/data/history_manager.py @@ -0,0 +1,443 @@ +"""Historical data management for preserving daily usage beyond Claude's 30-day cleanup. + +This module provides functionality to: +- Automatically save daily aggregated data +- Read historical data from saved files +- Merge historical and current data for comprehensive views +""" + +import json +import logging +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +class HistoryManager: + """Manages historical usage data storage and retrieval.""" + + def __init__(self, data_dir: Optional[Path] = None): + """Initialize the history manager. + + Args: + data_dir: Directory to store historical data. Defaults to ~/.claude-monitor/history + """ + self.data_dir = data_dir or Path.home() / ".claude-monitor" / "history" + self.data_dir.mkdir(parents=True, exist_ok=True) + self.daily_dir = self.data_dir / "daily" + self.daily_dir.mkdir(parents=True, exist_ok=True) + + # Session-level saved-date tracking removed to avoid short-circuiting logic + + def _get_daily_file_path(self, date_str: str) -> Path: + """Get the file path for a specific date's data. + + Args: + date_str: Date string in YYYY-MM-DD format + + Returns: + Path to the daily data file + + Raises: + ValueError: If date_str is not in valid YYYY-MM-DD format + """ + import re + + # Strict validation to prevent path traversal attacks + if not re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$", date_str): + raise ValueError(f"Invalid date format: {date_str}. Must be YYYY-MM-DD") + + # Parse and validate the date + try: + date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError as e: + raise ValueError(f"Invalid date: {date_str}. {e}") + + year = date.strftime("%Y") + month = date.strftime("%m") + + month_dir = self.daily_dir / year / month + month_dir.mkdir(parents=True, exist_ok=True) + + return month_dir / f"{date_str}.json" + + def save_daily_data( + self, daily_data: List[Dict[str, Any]], overwrite: bool = False + ) -> int: + """Save daily aggregated data to historical storage. + + Args: + daily_data: List of daily aggregated data dictionaries + overwrite: Whether to overwrite existing data for the same date + + Returns: + Number of days saved + """ + saved_count = 0 + + for day_data in daily_data: + date_str = day_data.get("date") + if not date_str: + logger.warning("Daily data missing 'date' field, skipping") + continue + + file_path = self._get_daily_file_path(date_str) + + # Check if file exists and whether to overwrite + if file_path.exists() and not overwrite: + # Load existing data to check if it needs updating + try: + with open(file_path, "r", encoding="utf-8") as f: + existing_data = json.load(f) + + # If the data is identical, skip writing + if existing_data == day_data: + continue + + # Prefer existing if it is greater-or-equal across key metrics, + # and strictly greater in at least one. + fields = [ + "input_tokens", + "output_tokens", + "cache_creation_tokens", + "cache_read_tokens", + "entries_count", + ] + ge_all = all( + existing_data.get(k, 0) >= day_data.get(k, 0) for k in fields + ) + gt_any = any( + existing_data.get(k, 0) > day_data.get(k, 0) for k in fields + ) or existing_data.get("total_cost", 0.0) > day_data.get( + "total_cost", 0.0 + ) + if ge_all and gt_any: + # Keep existing file; no write needed + continue + + # Otherwise, save the new data (it has more information) + + except Exception as e: + logger.warning(f"Error reading existing data for {date_str}: {e}") + + # Save the data + try: + temp_file = file_path.with_suffix(".tmp") + with open(temp_file, "w", encoding="utf-8") as f: + json.dump(day_data, f, indent=2, default=str, ensure_ascii=False) + temp_file.replace(file_path) + + saved_count += 1 + logger.debug(f"Saved historical data for {date_str}") + + except Exception as e: + logger.error(f"Failed to save historical data for {date_str}: {e}") + + if saved_count > 0: + logger.info(f"Saved historical data for {saved_count} days") + + return saved_count + + def load_historical_daily_data( + self, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + days_back: Optional[int] = None, + ) -> List[Dict[str, Any]]: + """Load historical daily data within the specified range. + + Args: + start_date: Start date for data retrieval (inclusive) + end_date: End date for data retrieval (inclusive) + days_back: Alternative to date range - get last N days of data + + Returns: + List of historical daily data dictionaries + + Note: + Both start_date and end_date are inclusive. For example, specifying + end_date as 2024-12-15 will include the file for 2024-12-15. + """ + historical_data = [] + + # Determine date range + if days_back: + end_date = datetime.now() + start_date = end_date - timedelta(days=days_back) + elif not start_date: + # Default to loading all available data + start_date = datetime(2020, 1, 1) # Arbitrary old date + + if not end_date: + end_date = datetime.now() + + # Scan for files in the date range + for year_dir in sorted(self.daily_dir.iterdir()): + if not year_dir.is_dir(): + continue + + try: + year = int(year_dir.name) + + # Skip years outside our range + if year < start_date.year or year > end_date.year: + continue + + for month_dir in sorted(year_dir.iterdir()): + if not month_dir.is_dir(): + continue + + for file_path in sorted(month_dir.glob("*.json")): + try: + # Extract date from filename + date_str = file_path.stem + file_date = datetime.strptime(date_str, "%Y-%m-%d") + + # Make file_date timezone-naive for comparison + # Convert start_date and end_date to naive if they're aware + compare_start = ( + start_date.replace(tzinfo=None) + if start_date and start_date.tzinfo + else start_date + ) + compare_end = ( + end_date.replace(tzinfo=None) + if end_date and end_date.tzinfo + else end_date + ) + + # Check if within range (inclusive on both ends) + if compare_start and file_date < compare_start: + continue + if ( + compare_end and file_date > compare_end + ): # > means end_date is inclusive + continue + + # Load the data + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + historical_data.append(data) + + except (ValueError, json.JSONDecodeError) as e: + logger.warning(f"Error loading {file_path}: {e}") + + except ValueError: + # Not a year directory, skip + continue + + # Sort by date + historical_data.sort(key=lambda x: x.get("date", "")) + + logger.info(f"Loaded {len(historical_data)} days of historical data") + return historical_data + + def merge_with_current_data( + self, current_data: List[Dict[str, Any]], historical_data: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Merge current and historical daily data, preferring current data for overlapping dates. + + Args: + current_data: Current daily aggregated data + historical_data: Historical daily data + + Returns: + Merged list of daily data, sorted by date + """ + # Create a dictionary keyed by date for efficient merging + merged_dict = {} + + # Add historical data first + for data in historical_data: + date_str = data.get("date") + if date_str: + merged_dict[date_str] = data + + # Add/overwrite with current data (more recent/accurate) + for data in current_data: + date_str = data.get("date") + if date_str: + merged_dict[date_str] = data + + # Convert back to sorted list + merged_data = list(merged_dict.values()) + merged_data.sort(key=lambda x: x.get("date", "")) + + logger.debug(f"Merged data contains {len(merged_data)} days") + return merged_data + + def cleanup_old_data(self, days_to_keep: int = 365) -> int: + """Clean up historical data older than specified days. + + Args: + days_to_keep: Number of days of historical data to keep + + Returns: + Number of files deleted + """ + cutoff_date = datetime.now() - timedelta(days=days_to_keep) + deleted_count = 0 + + for year_dir in self.daily_dir.iterdir(): + if not year_dir.is_dir(): + continue + + for month_dir in year_dir.iterdir(): + if not month_dir.is_dir(): + continue + + for file_path in month_dir.glob("*.json"): + try: + date_str = file_path.stem + file_date = datetime.strptime(date_str, "%Y-%m-%d") + + if file_date < cutoff_date: + file_path.unlink() + deleted_count += 1 + + except (ValueError, OSError) as e: + logger.warning(f"Error processing {file_path}: {e}") + + # Remove empty month directories + if not any(month_dir.iterdir()): + month_dir.rmdir() + + # Remove empty year directories + if not any(year_dir.iterdir()): + year_dir.rmdir() + + if deleted_count > 0: + logger.info(f"Cleaned up {deleted_count} old historical files") + + return deleted_count + + def aggregate_monthly_from_daily( + self, daily_data: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Aggregate daily data into monthly summaries. + + Args: + daily_data: List of daily aggregated data dictionaries + + Returns: + List of monthly aggregated data dictionaries + """ + from collections import defaultdict + + monthly_dict = {} + + for day in daily_data: + # Extract month key from date "2024-12-01" -> "2024-12" + date_str = day.get("date", "") + if not date_str or len(date_str) < 7: + continue + + month_key = date_str[:7] # "YYYY-MM" + + # Initialize month data if not exists + if month_key not in monthly_dict: + monthly_dict[month_key] = { + "month": month_key, + "input_tokens": 0, + "output_tokens": 0, + "cache_creation_tokens": 0, + "cache_read_tokens": 0, + "total_cost": 0.0, + "entries_count": 0, + "models_used": set(), + "model_breakdowns": defaultdict( + lambda: { + "input_tokens": 0, + "output_tokens": 0, + "cache_creation_tokens": 0, + "cache_read_tokens": 0, + "cost": 0.0, + "count": 0, + } + ), + } + + # Accumulate statistics + month_data = monthly_dict[month_key] + month_data["input_tokens"] += day.get("input_tokens", 0) + month_data["output_tokens"] += day.get("output_tokens", 0) + month_data["cache_creation_tokens"] += day.get("cache_creation_tokens", 0) + month_data["cache_read_tokens"] += day.get("cache_read_tokens", 0) + month_data["total_cost"] += day.get("total_cost", 0.0) + month_data["entries_count"] += day.get("entries_count", 0) + + # Accumulate models used + models = day.get("models_used", []) + if isinstance(models, list): + month_data["models_used"].update(models) + + # Accumulate model breakdowns + model_breakdowns = day.get("model_breakdowns", {}) + if isinstance(model_breakdowns, dict): + for model, stats in model_breakdowns.items(): + breakdown = month_data["model_breakdowns"][model] + breakdown["input_tokens"] += stats.get("input_tokens", 0) + breakdown["output_tokens"] += stats.get("output_tokens", 0) + breakdown["cache_creation_tokens"] += stats.get( + "cache_creation_tokens", 0 + ) + breakdown["cache_read_tokens"] += stats.get("cache_read_tokens", 0) + breakdown["cost"] += stats.get("cost", 0.0) + breakdown["count"] += stats.get("count", 0) + + # Convert to list format + result = [] + for month_key in sorted(monthly_dict.keys()): + month_data = monthly_dict[month_key] + # Convert set to sorted list for models_used + month_data["models_used"] = sorted(list(month_data["models_used"])) + # Convert defaultdict to regular dict for model_breakdowns + month_data["model_breakdowns"] = dict(month_data["model_breakdowns"]) + result.append(month_data) + + return result + + def get_statistics(self) -> Dict[str, Any]: + """Get statistics about stored historical data. + + Returns: + Dictionary with statistics about historical data + """ + total_files = 0 + oldest_date = None + newest_date = None + total_size = 0 + + for year_dir in self.daily_dir.iterdir(): + if not year_dir.is_dir(): + continue + + for month_dir in year_dir.iterdir(): + if not month_dir.is_dir(): + continue + + for file_path in month_dir.glob("*.json"): + total_files += 1 + total_size += file_path.stat().st_size + + try: + date_str = file_path.stem + file_date = datetime.strptime(date_str, "%Y-%m-%d") + + if oldest_date is None or file_date < oldest_date: + oldest_date = file_date + if newest_date is None or file_date > newest_date: + newest_date = file_date + + except ValueError: + continue + + return { + "total_files": total_files, + "oldest_date": oldest_date.strftime("%Y-%m-%d") if oldest_date else None, + "newest_date": newest_date.strftime("%Y-%m-%d") if newest_date else None, + "total_size_mb": round(total_size / (1024 * 1024), 2), + "data_directory": str(self.daily_dir), + } diff --git a/src/tests/test_aggregator.py b/src/tests/test_aggregator.py index ae0dd63..a5a1304 100644 --- a/src/tests/test_aggregator.py +++ b/src/tests/test_aggregator.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from typing import List +from unittest.mock import Mock, patch import pytest @@ -348,9 +349,8 @@ def test_aggregate_daily_with_date_filter( ) -> None: """Test daily aggregation with date filters.""" start_date = datetime(2024, 1, 15, tzinfo=timezone.utc) - end_date = datetime( - 2024, 1, 31, 23, 59, 59, tzinfo=timezone.utc - ) # Include the whole day + # end_date is inclusive - pass Jan 31 to include all of Jan 31 + end_date = datetime(2024, 1, 31, tzinfo=timezone.utc) result = aggregator.aggregate_daily(sample_entries, start_date, end_date) @@ -620,3 +620,171 @@ def test_period_sorting(self, aggregator: UsageAggregator) -> None: assert monthly_result[0]["month"] == "2024-01" assert monthly_result[1]["month"] == "2024-02" assert monthly_result[2]["month"] == "2024-03" + + def test_aggregate_daily_with_date_filters( + self, aggregator: UsageAggregator + ) -> None: + """Test aggregate_daily with date range filters.""" + entries = [] + # Create entries across 10 days + for i in range(10): + date = datetime(2024, 1, i + 1, 12, 0, tzinfo=timezone.utc) + entries.append( + UsageEntry( + timestamp=date, + input_tokens=100 * (i + 1), + output_tokens=50 * (i + 1), + cost_usd=0.001 * (i + 1), + model="claude-3-haiku", + message_id=f"msg_{i}", + request_id=f"req_{i}", + ) + ) + + # Filter for days 3-7 (Jan 3 to Jan 7) + start_date = datetime(2024, 1, 3, tzinfo=timezone.utc) + # end_date is inclusive - to get Jan 3-7, pass Jan 7 + end_date = datetime(2024, 1, 7, tzinfo=timezone.utc) + + result = aggregator.aggregate_daily(entries, start_date, end_date) + + # Should have 5 days (Jan 3, 4, 5, 6, 7) + assert len(result) == 5 + assert result[0]["date"] == "2024-01-03" + assert result[-1]["date"] == "2024-01-07" + + # Verify token counts for first day (Jan 3 = day 3, 300 input tokens) + assert result[0]["input_tokens"] == 300 + assert result[0]["output_tokens"] == 150 + + def test_aggregate_monthly_with_date_filters( + self, aggregator: UsageAggregator + ) -> None: + """Test aggregate_monthly with date range filters.""" + entries = [] + # Create entries spanning 3 months + for month in [11, 12]: + for day in [5, 15, 25]: + date = datetime(2024, month, day, tzinfo=timezone.utc) + entries.append( + UsageEntry( + timestamp=date, + input_tokens=1000, + output_tokens=500, + cost_usd=0.01, + model="claude-3-haiku", + message_id=f"msg_{month}_{day}", + request_id=f"req_{month}_{day}", + ) + ) + + # Also add January 2025 entries + for day in [5, 15]: + date = datetime(2025, 1, day, tzinfo=timezone.utc) + entries.append( + UsageEntry( + timestamp=date, + input_tokens=1000, + output_tokens=500, + cost_usd=0.01, + model="claude-3-haiku", + message_id=f"msg_2025_1_{day}", + request_id=f"req_2025_1_{day}", + ) + ) + + # Filter to December 2024 only + start_date = datetime(2024, 12, 1, tzinfo=timezone.utc) + end_date = datetime(2025, 1, 1, tzinfo=timezone.utc) + + result = aggregator.aggregate_monthly(entries, start_date, end_date) + + # Should have 1 month (December 2024) + assert len(result) == 1 + assert result[0]["month"] == "2024-12" + assert result[0]["input_tokens"] == 3000 # 3 days * 1000 + assert result[0]["output_tokens"] == 1500 # 3 days * 500 + + @patch("claude_monitor.data.reader.load_usage_entries") + def test_aggregate_with_date_filters( + self, mock_load: Mock, aggregator: UsageAggregator + ) -> None: + """Test main aggregate method with date filters.""" + entries = [] + for i in range(5): + date = datetime(2024, 1, i + 1, 12, 0, tzinfo=timezone.utc) + entries.append( + UsageEntry( + timestamp=date, + input_tokens=100, + output_tokens=50, + cost_usd=0.001, + model="claude-3-haiku", + message_id=f"msg_{i}", + request_id=f"req_{i}", + ) + ) + + mock_load.return_value = (entries, None) + + # Test with date filters + start_date = datetime(2024, 1, 2, tzinfo=timezone.utc) + # end_date is inclusive - to get Jan 2-3, pass Jan 3 + end_date = datetime(2024, 1, 3, tzinfo=timezone.utc) + + result = aggregator.aggregate(start_date=start_date, end_date=end_date) + + # Should have 2 days (Jan 2, 3) + assert len(result) == 2 + assert result[0]["date"] == "2024-01-02" + assert result[1]["date"] == "2024-01-03" + + # Test without filters - should return all + result_all = aggregator.aggregate() + assert len(result_all) == 5 + + def test_timezone_grouping_and_filters(self, tmp_path) -> None: + """Entries should be grouped and filtered using the selected timezone.""" + from claude_monitor.core.models import UsageEntry + + # Two entries around the UTC day boundary + e1 = UsageEntry( + timestamp=datetime(2023, 12, 31, 23, 30, tzinfo=timezone.utc), + input_tokens=100, + output_tokens=50, + cache_creation_tokens=0, + cache_read_tokens=0, + cost_usd=0.001, + model="m", + message_id="a", + request_id="a", + ) + e2 = UsageEntry( + timestamp=datetime(2024, 1, 1, 0, 30, tzinfo=timezone.utc), + input_tokens=200, + output_tokens=100, + cache_creation_tokens=0, + cache_read_tokens=0, + cost_usd=0.002, + model="m", + message_id="b", + request_id="b", + ) + + entries = [e1, e2] + + # Under UTC they should fall into different dates (2023-12-31 and 2024-01-01) + agg_utc = UsageAggregator(data_path=str(tmp_path), timezone="UTC") + res_utc = agg_utc.aggregate_daily(entries) + assert len(res_utc) == 2 + assert res_utc[0]["date"] == "2023-12-31" + assert res_utc[1]["date"] == "2024-01-01" + + # Under America/New_York (UTC-5) both timestamps belong to 2023-12-31 + agg_est = UsageAggregator(data_path=str(tmp_path), timezone="America/New_York") + res_est = agg_est.aggregate_daily(entries) + assert len(res_est) == 1 + assert res_est[0]["date"] == "2023-12-31" + # Validate totals + assert res_est[0]["input_tokens"] == 300 + assert res_est[0]["output_tokens"] == 150 diff --git a/src/tests/test_history_manager.py b/src/tests/test_history_manager.py new file mode 100644 index 0000000..f907d8e --- /dev/null +++ b/src/tests/test_history_manager.py @@ -0,0 +1,441 @@ +"""Tests for HistoryManager module.""" + +import json +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +import pytest + +from claude_monitor.data.history_manager import HistoryManager + + +class TestHistoryManager: + """Test suite for HistoryManager.""" + + @pytest.fixture + def temp_dir(self) -> Path: + """Create a temporary directory for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + @pytest.fixture + def history_manager(self, temp_dir: Path) -> HistoryManager: + """Create a HistoryManager instance with temporary directory.""" + return HistoryManager(data_dir=temp_dir) + + def test_initialization( + self, history_manager: HistoryManager, temp_dir: Path + ) -> None: + """Test HistoryManager initialization.""" + assert history_manager.data_dir == temp_dir + assert history_manager.daily_dir == temp_dir / "daily" + assert history_manager.daily_dir.exists() + + def test_get_daily_file_path(self, history_manager: HistoryManager) -> None: + """Test file path generation for daily data.""" + date_str = "2024-12-15" + expected_path = history_manager.daily_dir / "2024" / "12" / "2024-12-15.json" + actual_path = history_manager._get_daily_file_path(date_str) + assert actual_path == expected_path + assert actual_path.parent.exists() + + def test_get_daily_file_path_invalid_date( + self, history_manager: HistoryManager + ) -> None: + """Test file path generation with invalid date format.""" + date_str = "invalid-date" + with pytest.raises(ValueError, match="Invalid date format"): + history_manager._get_daily_file_path(date_str) + + # Test path traversal attempt + malicious_str = "../../../etc/passwd" + with pytest.raises(ValueError, match="Invalid date format"): + history_manager._get_daily_file_path(malicious_str) + + def test_save_daily_data(self, history_manager: HistoryManager) -> None: + """Test saving daily data.""" + daily_data = [ + { + "date": "2024-12-15", + "input_tokens": 1000, + "output_tokens": 500, + "total_cost": 0.015, + "entries_count": 5, + "models_used": ["claude-3-opus"], + } + ] + + saved_count = history_manager.save_daily_data(daily_data) + assert saved_count == 1 + + # Check file was created + file_path = history_manager._get_daily_file_path("2024-12-15") + assert file_path.exists() + + # Verify content + with open(file_path, "r") as f: + saved_data = json.load(f) + assert saved_data == daily_data[0] + + def test_save_daily_data_no_overwrite( + self, history_manager: HistoryManager + ) -> None: + """Test that save_daily_data doesn't overwrite when existing data has more information.""" + daily_data = [ + { + "date": "2024-12-15", + "input_tokens": 2000, + "output_tokens": 1000, + "total_cost": 0.030, + "entries_count": 10, + } + ] + + # Save first time with more data + saved_count = history_manager.save_daily_data(daily_data) + assert saved_count == 1 + + # Try to save again with less data + less_data = [ + { + "date": "2024-12-15", + "input_tokens": 1000, + "output_tokens": 500, + "total_cost": 0.015, + "entries_count": 5, + } + ] + saved_count = history_manager.save_daily_data(less_data, overwrite=False) + assert saved_count == 0 # Should not save again + + # Verify original data is preserved + file_path = history_manager._get_daily_file_path("2024-12-15") + with open(file_path, "r") as f: + saved_data = json.load(f) + assert saved_data["input_tokens"] == 2000 + assert saved_data["output_tokens"] == 1000 + + def test_save_daily_data_with_overwrite( + self, history_manager: HistoryManager + ) -> None: + """Test save_daily_data with overwrite enabled.""" + daily_data = [ + { + "date": "2024-12-15", + "input_tokens": 1000, + "output_tokens": 500, + } + ] + + # Save first time + history_manager.save_daily_data(daily_data) + + # Modify and save with overwrite + daily_data[0]["input_tokens"] = 2000 + saved_count = history_manager.save_daily_data(daily_data, overwrite=True) + assert saved_count == 1 + + # Verify data was updated + file_path = history_manager._get_daily_file_path("2024-12-15") + with open(file_path, "r") as f: + saved_data = json.load(f) + assert saved_data["input_tokens"] == 2000 + + def test_load_historical_daily_data(self, history_manager: HistoryManager) -> None: + """Test loading historical daily data.""" + # Save some test data + test_data = [ + {"date": "2024-12-10", "input_tokens": 100}, + {"date": "2024-12-15", "input_tokens": 200}, + {"date": "2024-12-20", "input_tokens": 300}, + ] + + for data in test_data: + history_manager.save_daily_data([data]) + + # Load all data + loaded_data = history_manager.load_historical_daily_data() + assert len(loaded_data) == 3 + assert loaded_data[0]["date"] == "2024-12-10" + assert loaded_data[2]["date"] == "2024-12-20" + + def test_load_historical_daily_data_with_date_range( + self, history_manager: HistoryManager + ) -> None: + """Test loading historical data with date filters.""" + # Save test data + test_data = [ + {"date": "2024-12-10", "input_tokens": 100}, + {"date": "2024-12-15", "input_tokens": 200}, + {"date": "2024-12-20", "input_tokens": 300}, + ] + + for data in test_data: + history_manager.save_daily_data([data]) + + # Load data with date range + start_date = datetime(2024, 12, 12) + end_date = datetime(2024, 12, 18) + loaded_data = history_manager.load_historical_daily_data( + start_date=start_date, end_date=end_date + ) + + assert len(loaded_data) == 1 + assert loaded_data[0]["date"] == "2024-12-15" + + def test_load_historical_daily_data_days_back( + self, history_manager: HistoryManager + ) -> None: + """Test loading historical data with days_back parameter.""" + # Save test data with current date + today = datetime.now() + yesterday = today - timedelta(days=1) + week_ago = today - timedelta(days=7) + + test_data = [ + {"date": week_ago.strftime("%Y-%m-%d"), "input_tokens": 100}, + {"date": yesterday.strftime("%Y-%m-%d"), "input_tokens": 200}, + {"date": today.strftime("%Y-%m-%d"), "input_tokens": 300}, + ] + + for data in test_data: + history_manager.save_daily_data([data]) + + # Load last 3 days + loaded_data = history_manager.load_historical_daily_data(days_back=3) + assert len(loaded_data) == 2 # Should get yesterday and today + + def test_merge_with_current_data(self, history_manager: HistoryManager) -> None: + """Test merging current and historical data.""" + historical_data = [ + {"date": "2024-12-10", "input_tokens": 100, "source": "historical"}, + {"date": "2024-12-15", "input_tokens": 200, "source": "historical"}, + ] + + current_data = [ + {"date": "2024-12-15", "input_tokens": 250, "source": "current"}, + {"date": "2024-12-20", "input_tokens": 300, "source": "current"}, + ] + + merged = history_manager.merge_with_current_data(current_data, historical_data) + + assert len(merged) == 3 + assert merged[0]["date"] == "2024-12-10" + assert merged[0]["source"] == "historical" + assert merged[1]["date"] == "2024-12-15" + assert merged[1]["source"] == "current" # Current data takes precedence + assert merged[2]["date"] == "2024-12-20" + assert merged[2]["source"] == "current" + + def test_aggregate_monthly_from_daily( + self, history_manager: HistoryManager + ) -> None: + """Test aggregating daily data into monthly summaries.""" + daily_data = [ + { + "date": "2024-11-15", + "input_tokens": 100, + "output_tokens": 50, + "total_cost": 0.01, + "entries_count": 2, + "models_used": ["claude-3-opus"], + "model_breakdowns": { + "claude-3-opus": { + "input_tokens": 100, + "output_tokens": 50, + "cost": 0.01, + "count": 2, + } + }, + }, + { + "date": "2024-11-20", + "input_tokens": 200, + "output_tokens": 100, + "total_cost": 0.02, + "entries_count": 3, + "models_used": ["claude-3-sonnet"], + "model_breakdowns": { + "claude-3-sonnet": { + "input_tokens": 200, + "output_tokens": 100, + "cost": 0.02, + "count": 3, + } + }, + }, + { + "date": "2024-12-01", + "input_tokens": 300, + "output_tokens": 150, + "total_cost": 0.03, + "entries_count": 4, + "models_used": ["claude-3-opus"], + "model_breakdowns": { + "claude-3-opus": { + "input_tokens": 300, + "output_tokens": 150, + "cost": 0.03, + "count": 4, + } + }, + }, + ] + + monthly_data = history_manager.aggregate_monthly_from_daily(daily_data) + + assert len(monthly_data) == 2 + + # Check November aggregation + nov_data = monthly_data[0] + assert nov_data["month"] == "2024-11" + assert nov_data["input_tokens"] == 300 + assert nov_data["output_tokens"] == 150 + assert nov_data["total_cost"] == 0.03 + assert nov_data["entries_count"] == 5 + assert set(nov_data["models_used"]) == {"claude-3-opus", "claude-3-sonnet"} + + # Check December aggregation + dec_data = monthly_data[1] + assert dec_data["month"] == "2024-12" + assert dec_data["input_tokens"] == 300 + assert dec_data["output_tokens"] == 150 + assert dec_data["total_cost"] == 0.03 + assert dec_data["entries_count"] == 4 + + def test_cleanup_old_data(self, history_manager: HistoryManager) -> None: + """Test cleanup of old historical data.""" + # Create old and recent data + old_date = (datetime.now() - timedelta(days=400)).strftime("%Y-%m-%d") + recent_date = (datetime.now() - timedelta(days=10)).strftime("%Y-%m-%d") + + old_data = [{"date": old_date, "input_tokens": 100}] + recent_data = [{"date": recent_date, "input_tokens": 200}] + + history_manager.save_daily_data(old_data) + history_manager.save_daily_data(recent_data) + + # Cleanup data older than 365 days + deleted_count = history_manager.cleanup_old_data(days_to_keep=365) + + assert deleted_count == 1 + + # Verify old data is gone, recent data remains + old_file = history_manager._get_daily_file_path(old_date) + recent_file = history_manager._get_daily_file_path(recent_date) + + assert not old_file.exists() + assert recent_file.exists() + + def test_get_statistics(self, history_manager: HistoryManager) -> None: + """Test getting statistics about historical data.""" + # Save some test data + test_data = [ + {"date": "2024-11-15", "input_tokens": 100}, + {"date": "2024-12-15", "input_tokens": 200}, + {"date": "2024-12-20", "input_tokens": 300}, + ] + + for data in test_data: + history_manager.save_daily_data([data]) + + stats = history_manager.get_statistics() + + assert stats["total_files"] == 3 + assert stats["oldest_date"] == "2024-11-15" + assert stats["newest_date"] == "2024-12-20" + assert "total_size_mb" in stats + assert stats["data_directory"] == str(history_manager.daily_dir) + + def test_save_daily_data_missing_date( + self, history_manager: HistoryManager + ) -> None: + """Test handling of data without date field.""" + daily_data = [ + {"input_tokens": 1000, "output_tokens": 500} # Missing 'date' field + ] + + saved_count = history_manager.save_daily_data(daily_data) + assert saved_count == 0 + + def test_save_daily_data_with_existing_better_data( + self, history_manager: HistoryManager + ) -> None: + """Test that existing data with more total tokens is preserved.""" + # Save initial data with more total tokens + initial_data = [ + { + "date": "2024-12-15", + "input_tokens": 2000, + "output_tokens": 1000, + "cache_creation_tokens": 100, + "cache_read_tokens": 50, + "total_cost": 0.10, + "entries_count": 20, + } + ] + history_manager.save_daily_data(initial_data) + + # Try to save data with fewer total tokens + new_data = [ + { + "date": "2024-12-15", + "input_tokens": 500, + "output_tokens": 250, + "cache_creation_tokens": 50, + "cache_read_tokens": 25, + "total_cost": 0.05, + "entries_count": 10, + } + ] + saved_count = history_manager.save_daily_data(new_data, overwrite=False) + assert saved_count == 0 + + # Verify original data is preserved + file_path = history_manager._get_daily_file_path("2024-12-15") + with open(file_path, "r", encoding="utf-8") as f: + saved_data = json.load(f) + assert saved_data["input_tokens"] == 2000 + assert saved_data["total_cost"] == 0.10 + + def test_save_daily_data_updates_with_more_info( + self, history_manager: HistoryManager + ) -> None: + """Test that new data with more information replaces old data.""" + # Save initial data + initial_data = [ + { + "date": "2024-12-16", + "input_tokens": 1000, + "output_tokens": 500, + "cache_creation_tokens": 100, + "cache_read_tokens": 50, + "total_cost": 0.05, + "entries_count": 10, + } + ] + history_manager.save_daily_data(initial_data) + + # Save new data with more total tokens + new_data = [ + { + "date": "2024-12-16", + "input_tokens": 900, + "output_tokens": 600, + "cache_creation_tokens": 200, + "cache_read_tokens": 100, + "total_cost": 0.08, + "entries_count": 15, + } + ] + saved_count = history_manager.save_daily_data(new_data, overwrite=False) + assert saved_count == 1 + + # Verify new data was saved + file_path = history_manager._get_daily_file_path("2024-12-16") + with open(file_path, "r", encoding="utf-8") as f: + saved_data = json.load(f) + assert saved_data["input_tokens"] == 900 + assert saved_data["cache_creation_tokens"] == 200 + assert saved_data["total_cost"] == 0.08 diff --git a/src/tests/test_settings.py b/src/tests/test_settings.py index af6d534..ed6a1cf 100644 --- a/src/tests/test_settings.py +++ b/src/tests/test_settings.py @@ -666,3 +666,95 @@ def test_settings_customise_sources(self) -> None: # Should only return init_settings assert sources == ("init_settings",) + + def test_new_date_fields(self) -> None: + """Test new start_date and end_date fields.""" + # Test with valid dates + settings = Settings( + _cli_parse_args=[], + plan="pro", + start_date="2024-12-01", + end_date="2024-12-31", + ) + assert settings.start_date == "2024-12-01" + assert settings.end_date == "2024-12-31" + + # Test with None (optional fields) + settings = Settings(_cli_parse_args=[], plan="pro") + assert settings.start_date is None + assert settings.end_date is None + + # Test different date formats + settings = Settings( + _cli_parse_args=[], + plan="pro", + start_date="2024.12.01", + end_date="2024/12/31", + ) + assert settings.start_date == "2024.12.01" + assert settings.end_date == "2024/12/31" + + def test_history_field(self) -> None: + """Test history field with all valid values.""" + valid_modes = ["auto", "off", "readonly", "writeonly"] + + for mode in valid_modes: + settings = Settings(_cli_parse_args=[], plan="pro", history=mode) + assert settings.history == mode + + # Test default value + settings = Settings(_cli_parse_args=[], plan="pro") + assert settings.history == "auto" + + # Test case insensitive validation + settings = Settings(_cli_parse_args=[], plan="pro", history="AUTO") + assert settings.history == "auto" + + settings = Settings(_cli_parse_args=[], plan="pro", history="ReadOnly") + assert settings.history == "readonly" + + # Test invalid value + with pytest.raises(ValueError, match="Invalid history mode"): + Settings(_cli_parse_args=[], plan="pro", history="invalid_mode") + + def test_validate_history_validator(self) -> None: + """Test the validate_history field validator.""" + # Test with string input + result = Settings.validate_history("WRITEONLY") + assert result == "writeonly" + + result = Settings.validate_history("Off") + assert result == "off" + + # Test invalid string + with pytest.raises(ValueError, match="Invalid history mode"): + Settings.validate_history("bad_mode") + + def test_to_namespace_includes_new_fields(self) -> None: + """Test that new fields are included in to_namespace.""" + settings = Settings( + _cli_parse_args=[], + plan="pro", + start_date="2024-12-01", + end_date="2024-12-31", + history="readonly", + ) + + namespace = settings.to_namespace() + + assert hasattr(namespace, "start_date") + assert namespace.start_date == "2024-12-01" + + assert hasattr(namespace, "end_date") + assert namespace.end_date == "2024-12-31" + + assert hasattr(namespace, "history") + assert namespace.history == "readonly" + + # Test with defaults + settings = Settings(_cli_parse_args=[], plan="pro") + namespace = settings.to_namespace() + + assert namespace.start_date is None + assert namespace.end_date is None + assert namespace.history == "auto"