Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/claude_monitor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _run_monitoring(args: argparse.Namespace) -> None:
args.refresh_rate if hasattr(args, "refresh_rate") else 10
),
data_path=str(data_path),
data_source=getattr(args, "data_source", "auto"),
)
orchestrator.set_args(args)

Expand Down Expand Up @@ -390,6 +391,7 @@ def _run_table_view(
data_path=str(data_path),
aggregation_mode=view_mode,
timezone=args.timezone,
data_source=getattr(args, "data_source", "auto"),
)

# Create table controller
Expand Down
21 changes: 21 additions & 0 deletions src/claude_monitor/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def save(self, settings: "Settings") -> None:
"refresh_rate": settings.refresh_rate,
"reset_hour": settings.reset_hour,
"view": settings.view,
"data_source": settings.data_source,
"timestamp": datetime.now().isoformat(),
}

Expand Down Expand Up @@ -109,6 +110,11 @@ class Settings(BaseSettings):
description="View mode (realtime, daily, monthly, session)",
)

data_source: Literal["auto", "claude", "opencode"] = Field(
default="auto",
description="Data source (auto, claude, opencode). Auto-detects available source.",
)

@staticmethod
def _get_system_timezone() -> str:
"""Lazy import to avoid circular dependencies."""
Expand Down Expand Up @@ -198,6 +204,20 @@ def validate_view(cls, v: Any) -> str:
)
return v

@field_validator("data_source", mode="before")
@classmethod
def validate_data_source(cls, v: Any) -> str:
"""Validate and normalize data source value."""
if isinstance(v, str):
v_lower = v.lower()
valid_sources = ["auto", "claude", "opencode"]
if v_lower in valid_sources:
return v_lower
raise ValueError(
f"Invalid data source: {v}. Must be one of: {', '.join(valid_sources)}"
)
return v

@field_validator("theme", mode="before")
@classmethod
def validate_theme(cls, v: Any) -> str:
Expand Down Expand Up @@ -350,5 +370,6 @@ def to_namespace(self) -> argparse.Namespace:
args.log_level = self.log_level
args.log_file = str(self.log_file) if self.log_file else None
args.version = self.version
args.data_source = self.data_source

return args
26 changes: 23 additions & 3 deletions src/claude_monitor/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
"""Data package for Claude Monitor."""
"""Data package for Claude Monitor.

# Import directly from modules without facade
__all__: list[str] = []
Provides data loading from multiple sources:
- Claude Code (~/.claude/projects/*.jsonl)
- OpenCode (~/.local/share/opencode/storage/message/*.json)
"""

from claude_monitor.data.reader import (
DataSource,
detect_available_sources,
detect_data_source,
get_data_source_info,
load_usage_entries,
load_usage_entries_unified,
)

__all__ = [
"DataSource",
"detect_available_sources",
"detect_data_source",
"get_data_source_info",
"load_usage_entries",
"load_usage_entries_unified",
]
27 changes: 23 additions & 4 deletions src/claude_monitor/data/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,24 @@ class UsageAggregator:
"""Aggregates usage data for daily and monthly reports."""

def __init__(
self, data_path: str, aggregation_mode: str = "daily", timezone: str = "UTC"
self,
data_path: str,
aggregation_mode: str = "daily",
timezone: str = "UTC",
data_source: str = "auto",
):
"""Initialize the aggregator.

Args:
data_path: Path to the data directory
aggregation_mode: Mode of aggregation ('daily' or 'monthly')
timezone: Timezone string for date formatting
data_source: Data source to use ("auto", "claude", "opencode")
"""
self.data_path = data_path
self.aggregation_mode = aggregation_mode
self.timezone = timezone
self.data_source = data_source
self.timezone_handler = TimezoneHandler()

def _aggregate_by_period(
Expand Down Expand Up @@ -272,12 +278,25 @@ def aggregate(self) -> List[Dict[str, Any]]:
Returns:
List of aggregated data based on aggregation_mode
"""
from claude_monitor.data.reader import load_usage_entries
from claude_monitor.data.reader import DataSource, load_usage_entries_unified

logger.info(f"Starting aggregation in {self.aggregation_mode} mode")

# Load usage entries
entries, _ = load_usage_entries(data_path=self.data_path)
# Convert string source to DataSource enum
source_map = {
"auto": DataSource.AUTO,
"all": DataSource.ALL,
"claude": DataSource.CLAUDE,
"opencode": DataSource.OPENCODE,
}
source_enum = source_map.get(self.data_source.lower(), DataSource.AUTO)

# Load usage entries from all available sources
entries, _, detected_source = load_usage_entries_unified(
data_path=self.data_path,
source=source_enum,
)
logger.info(f"Loaded {len(entries)} entries from {detected_source.value}")

if not entries:
logger.warning("No usage entries found")
Expand Down
28 changes: 24 additions & 4 deletions src/claude_monitor/data/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from claude_monitor.core.calculations import BurnRateCalculator
from claude_monitor.core.models import CostMode, SessionBlock, UsageEntry
from claude_monitor.data.analyzer import SessionAnalyzer
from claude_monitor.data.reader import load_usage_entries
from claude_monitor.data.reader import DataSource, load_usage_entries_unified

logger = logging.getLogger(__name__)

Expand All @@ -20,6 +20,7 @@ def analyze_usage(
use_cache: bool = True,
quick_start: bool = False,
data_path: Optional[str] = None,
data_source: str = "auto",
) -> Dict[str, Any]:
"""
Main entry point to generate response_final.json.
Expand All @@ -35,13 +36,14 @@ def analyze_usage(
use_cache: Use cached data when available
quick_start: Use minimal data for quick startup (last 24h only)
data_path: Optional path to Claude data directory
data_source: Data source to use ("auto", "claude", "opencode")

Returns:
Dictionary with analyzed blocks
"""
logger.info(
f"analyze_usage called with hours_back={hours_back}, use_cache={use_cache}, "
f"quick_start={quick_start}, data_path={data_path}"
f"quick_start={quick_start}, data_path={data_path}, data_source={data_source}"
)

if quick_start and hours_back is None:
Expand All @@ -50,15 +52,32 @@ def analyze_usage(
elif quick_start:
logger.info(f"Quick start mode: loading last {hours_back} hours")

# Convert string source to DataSource enum
source_map = {
"auto": DataSource.AUTO,
"all": DataSource.ALL,
"claude": DataSource.CLAUDE,
"opencode": DataSource.OPENCODE,
}
data_source_lower = data_source.lower()
source_enum = source_map.get(data_source_lower)
if source_enum is None:
logger.warning(f"Unknown data_source '{data_source}', defaulting to 'auto'")
source_enum = DataSource.AUTO

start_time = datetime.now()
entries, raw_entries = load_usage_entries(
entries, raw_entries, detected_source = load_usage_entries_unified(
data_path=data_path,
hours_back=hours_back,
mode=CostMode.AUTO,
include_raw=True,
source=source_enum,
)
load_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Data loaded in {load_time:.3f}s")
logger.info(
f"Data loaded in {load_time:.3f}s from {detected_source.value} "
f"({len(entries)} entries)"
)

start_time = datetime.now()
analyzer = SessionAnalyzer(session_duration_hours=5)
Expand Down Expand Up @@ -93,6 +112,7 @@ def analyze_usage(
"transform_time_seconds": transform_time,
"cache_used": use_cache,
"quick_start": quick_start,
"data_source": detected_source.value,
}

result = _create_result(blocks, entries, metadata)
Expand Down
Loading