Skip to content
153 changes: 151 additions & 2 deletions src/claude_monitor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,38 @@ def _run_table_view(
logger = logging.getLogger(__name__)

try:
# Parse date filters early so they can be used for both current and historical data
from datetime import datetime

from claude_monitor.utils.time_utils import TimezoneHandler

tz = TimezoneHandler(args.timezone)

def _parse_date(date_str: Optional[str]):
if not date_str:
return None
for fmt in ("%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d"):
try:
return tz.ensure_timezone(datetime.strptime(date_str, fmt))
except ValueError:
continue
print_themed(
f"Invalid date format: {date_str}. Use one of: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD.",
style="warning",
)
return None

start_dt = _parse_date(getattr(args, "start_date", None))
end_dt = _parse_date(getattr(args, "end_date", None))

# Validate date range
if start_dt and end_dt and start_dt > end_dt:
print_themed(
f"Error: start_date ({getattr(args, 'start_date', None)}) must be on or before end_date ({getattr(args, 'end_date', None)})",
style="error",
)
return

# Create aggregator with appropriate mode
aggregator = UsageAggregator(
data_path=str(data_path),
Expand All @@ -395,9 +427,126 @@ def _run_table_view(
# Create table controller
controller = TableViewsController(console=console)

# Get aggregated data
# Get aggregated data with date filters
logger.info(f"Loading {view_mode} usage data...")
aggregated_data = aggregator.aggregate()
aggregated_data = aggregator.aggregate(start_dt, end_dt)

# Initialize history manager for daily and monthly views
history_mode = getattr(args, "history", "auto")
if history_mode != "off":
from claude_monitor.data.history_manager import HistoryManager

history_manager = HistoryManager()

if view_mode == "daily":
# Load historical data if in auto or readonly mode
if history_mode in ["auto", "readonly"]:
# Load historical data using the same date filters
historical_data = history_manager.load_historical_daily_data(
start_date=start_dt,
end_date=end_dt, # history_manager uses inclusive dates
)

if historical_data:
print_themed(
f"Loaded {len(historical_data)} days from history",
style="info",
)

# Merge with current data
aggregated_data = history_manager.merge_with_current_data(
aggregated_data, historical_data
)
print_themed(
f"Displaying {len(aggregated_data)} total days",
style="info",
)

# Save current data to history if in auto or writeonly mode
if aggregated_data and history_mode in ["auto", "writeonly"]:
saved_count = history_manager.save_daily_data(aggregated_data)
if saved_count > 0:
print_themed(
f"Saved {saved_count} days to history", style="success"
)

elif view_mode == "monthly":
# For monthly view, always work with daily data and aggregate to monthly
if history_mode in ["auto", "readonly"]:
# Get current daily data first
daily_aggregator = UsageAggregator(
data_path=str(data_path),
aggregation_mode="daily",
timezone=args.timezone,
)
current_daily = daily_aggregator.aggregate(start_dt, end_dt)

# Load historical daily data
daily_historical = history_manager.load_historical_daily_data(
start_date=start_dt,
end_date=end_dt, # history_manager uses inclusive dates
)

# Save current daily data to history in auto or writeonly mode
if history_mode in ["auto", "writeonly"] and current_daily:
saved = history_manager.save_daily_data(current_daily)
if saved > 0:
print_themed(
f"Saved {saved} days to history", style="success"
)

# Merge current and historical daily data
all_daily = []
if current_daily and daily_historical:
all_daily = history_manager.merge_with_current_data(
current_daily, daily_historical
)
# Show data source composition
current_dates = {d.get("date") for d in current_daily}
historical_dates = {d.get("date") for d in daily_historical}
from_current = len(current_dates)
from_history_only = len(historical_dates - current_dates)

if from_history_only > 0:
print_themed(
f"Loaded {len(all_daily)} days total ({from_current} from current session, {from_history_only} from history)",
style="info",
)
else:
print_themed(
f"Loaded {len(all_daily)} days from current session",
style="info",
)
elif current_daily:
all_daily = current_daily
print_themed(
f"Using {len(current_daily)} current days", style="info"
)
elif daily_historical:
all_daily = daily_historical
print_themed(
f"Using {len(daily_historical)} historical days",
style="info",
)

# Always aggregate daily data into monthly
if all_daily:
monthly_from_daily = (
history_manager.aggregate_monthly_from_daily(all_daily)
)

if monthly_from_daily:
# Replace the initial aggregated_data with the one from daily
aggregated_data = monthly_from_daily
print_themed(
f"Displaying {len(aggregated_data)} months aggregated from {len(all_daily)} days",
style="info",
)
else:
print_themed(
"No monthly data could be aggregated from daily data",
style="warning",
)

if not aggregated_data:
print_themed(f"No usage data found for {view_mode} view", style="warning")
Expand Down
32 changes: 32 additions & 0 deletions src/claude_monitor/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ def _get_system_time_format() -> str:
description="Display theme (light, dark, classic, auto)",
)

start_date: Optional[str] = Field(
default=None,
description="Start date for filtering data (formats: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD)",
)

end_date: Optional[str] = Field(
default=None,
description="End date for filtering data (formats: YYYY-MM-DD, YYYY.MM.DD, YYYY/MM/DD)",
)

custom_limit_tokens: Optional[int] = Field(
default=None, gt=0, description="Token limit for custom plan"
)
Expand Down Expand Up @@ -170,6 +180,11 @@ def _get_system_time_format() -> str:

clear: bool = Field(default=False, description="Clear saved configuration")

history: Literal["auto", "off", "readonly", "writeonly"] = Field(
default="auto",
description="History mode: auto (save+load), off (disable), readonly (load only), writeonly (save only)",
)

@field_validator("plan", mode="before")
@classmethod
def validate_plan(cls, v: Any) -> str:
Expand Down Expand Up @@ -240,6 +255,20 @@ def validate_log_level(cls, v: str) -> str:
raise ValueError(f"Invalid log level: {v}")
return v_upper

@field_validator("history", mode="before")
@classmethod
def validate_history(cls, v: Any) -> str:
"""Validate and normalize history mode value."""
if isinstance(v, str):
v_lower = v.lower()
valid_modes = ["auto", "off", "readonly", "writeonly"]
if v_lower in valid_modes:
return v_lower
raise ValueError(
f"Invalid history mode: {v}. Must be one of: {', '.join(valid_modes)}"
)
return v

@classmethod
def settings_customise_sources(
cls,
Expand Down Expand Up @@ -350,5 +379,8 @@ def to_namespace(self) -> argparse.Namespace:
args.log_level = self.log_level
args.log_file = str(self.log_file) if self.log_file else None
args.version = self.version
args.start_date = self.start_date
args.end_date = self.end_date
args.history = self.history

return args
61 changes: 47 additions & 14 deletions src/claude_monitor/data/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional

from claude_monitor.core.models import SessionBlock, UsageEntry, normalize_model_name
Expand Down Expand Up @@ -105,7 +105,9 @@ def __init__(
self.data_path = data_path
self.aggregation_mode = aggregation_mode
self.timezone = timezone
self.timezone_handler = TimezoneHandler()
# Initialize handler with the user-selected timezone so subsequent
# conversions and localizations use it consistently.
self.timezone_handler = TimezoneHandler(timezone)

def _aggregate_by_period(
self,
Expand All @@ -121,23 +123,48 @@ def _aggregate_by_period(
entries: List of usage entries
period_key_func: Function to extract period key from timestamp
period_type: Type of period ('date' or 'month')
start_date: Optional start date filter
end_date: Optional end date filter
start_date: Optional start date filter (inclusive)
end_date: Optional end date filter (inclusive - includes the whole day)

Returns:
List of aggregated data dictionaries

Note:
Both start_date and end_date are inclusive. If end_date is provided,
all entries from that entire day are included (up to 23:59:59.999999).
"""
period_data: Dict[str, AggregatedPeriod] = {}

# Normalize filter boundaries into the configured timezone for
# consistent, intuitive "whole-day inclusive" semantics.
norm_start = (
self.timezone_handler.to_timezone(start_date, self.timezone)
if start_date
else None
)
norm_end = (
self.timezone_handler.to_timezone(end_date, self.timezone)
if end_date
else None
)

for entry in entries:
# Apply date filters
if start_date and entry.timestamp < start_date:
continue
if end_date and entry.timestamp > end_date:
# Convert entry timestamp to the configured timezone for filtering
# and period-key extraction.
ts_local = self.timezone_handler.to_timezone(entry.timestamp, self.timezone)

# Apply date filters (inclusive boundaries in local timezone)
if norm_start and ts_local < norm_start:
continue
# For end_date, include all entries up to the end of that day.
# Exclude entries >= next day's midnight in local timezone.
if norm_end:
next_day = norm_end + timedelta(days=1)
if ts_local >= next_day:
continue

Comment on lines +140 to 167
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: end_date with time-of-day over-includes the next day. Normalize to day-start before +1 day.

Currently next_day = norm_end + 1 day. If norm_end is 23:59:59, you include the entire following day until 23:59:59. Normalize start/end to the start of their days, then use an exclusive upper bound = end_day_start + 1 day.

-        norm_start = (
-            self.timezone_handler.to_timezone(start_date, self.timezone)
-            if start_date
-            else None
-        )
-        norm_end = (
-            self.timezone_handler.to_timezone(end_date, self.timezone)
-            if end_date
-            else None
-        )
+        norm_start = (
+            self.timezone_handler.to_timezone(start_date, self.timezone)
+            if start_date
+            else None
+        )
+        norm_end = (
+            self.timezone_handler.to_timezone(end_date, self.timezone)
+            if end_date
+            else None
+        )
+        # Day-normalized inclusive bounds in local time
+        start_bound = (
+            norm_start.replace(hour=0, minute=0, second=0, microsecond=0)
+            if norm_start
+            else None
+        )
+        end_exclusive = (
+            norm_end.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
+            if norm_end
+            else None
+        )
@@
-            # Apply date filters (inclusive boundaries in local timezone)
-            if norm_start and ts_local < norm_start:
+            # Apply date filters (inclusive by calendar day)
+            if start_bound and ts_local < start_bound:
                 continue
-            # For end_date, include all entries up to the end of that day.
-            # Exclude entries >= next day's midnight in local timezone.
-            if norm_end:
-                next_day = norm_end + timedelta(days=1)
-                if ts_local >= next_day:
-                    continue
+            # Exclude entries on or after start_of_day(end_date + 1)
+            if end_exclusive and ts_local >= end_exclusive:
+                continue
#!/bin/bash
# Expect to see only two days (2024-01-15 and 2024-01-31) in tests;
# grep current logic and test inputs to confirm potential overshoot.
rg -n -C2 'next_day\s*=\s*norm_end\s*\+\s*timedelta\(days=1\)' src/claude_monitor/data/aggregator.py
rg -n -C2 'aggregate_daily\(.+end_date' src/tests | sed -n '1,120p'

# Get period key
period_key = period_key_func(entry.timestamp)
# Get period key using local time
period_key = period_key_func(ts_local)

# Get or create period aggregate
if period_key not in period_data:
Expand Down Expand Up @@ -266,9 +293,15 @@ def calculate_totals(self, aggregated_data: List[Dict[str, Any]]) -> Dict[str, A
"entries_count": total_stats.count,
}

def aggregate(self) -> List[Dict[str, Any]]:
def aggregate(
self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Main aggregation method that reads data and returns aggregated results.

Args:
start_date: Optional start date filter
end_date: Optional end date filter

Returns:
List of aggregated data based on aggregation_mode
"""
Expand All @@ -288,10 +321,10 @@ def aggregate(self) -> List[Dict[str, Any]]:
if entry.timestamp.tzinfo is None:
entry.timestamp = self.timezone_handler.ensure_timezone(entry.timestamp)

# Aggregate based on mode
# Aggregate based on mode with date filters
if self.aggregation_mode == "daily":
return self.aggregate_daily(entries)
return self.aggregate_daily(entries, start_date, end_date)
elif self.aggregation_mode == "monthly":
return self.aggregate_monthly(entries)
return self.aggregate_monthly(entries, start_date, end_date)
else:
raise ValueError(f"Invalid aggregation mode: {self.aggregation_mode}")
Loading