Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/claude_monitor/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _run_monitoring(args: argparse.Namespace) -> None:
args.refresh_rate if hasattr(args, "refresh_rate") else 10
),
data_path=str(data_path),
data_source=getattr(args, "data_source", "auto"),
)
orchestrator.set_args(args)

Expand Down Expand Up @@ -390,6 +391,7 @@ def _run_table_view(
data_path=str(data_path),
aggregation_mode=view_mode,
timezone=args.timezone,
data_source=getattr(args, "data_source", "auto"),
)

# Create table controller
Expand Down
20 changes: 20 additions & 0 deletions src/claude_monitor/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class Settings(BaseSettings):
description="View mode (realtime, daily, monthly, session)",
)

data_source: Literal["auto", "claude", "opencode"] = Field(
default="auto",
description="Data source (auto, claude, opencode). Auto-detects available source.",
)

@staticmethod
def _get_system_timezone() -> str:
"""Lazy import to avoid circular dependencies."""
Expand Down Expand Up @@ -198,6 +203,20 @@ def validate_view(cls, v: Any) -> str:
)
return v

@field_validator("data_source", mode="before")
@classmethod
def validate_data_source(cls, v: Any) -> str:
"""Validate and normalize data source value."""
if isinstance(v, str):
v_lower = v.lower()
valid_sources = ["auto", "claude", "opencode"]
if v_lower in valid_sources:
return v_lower
raise ValueError(
f"Invalid data source: {v}. Must be one of: {', '.join(valid_sources)}"
)
return v

@field_validator("theme", mode="before")
@classmethod
def validate_theme(cls, v: Any) -> str:
Expand Down Expand Up @@ -350,5 +369,6 @@ def to_namespace(self) -> argparse.Namespace:
args.log_level = self.log_level
args.log_file = str(self.log_file) if self.log_file else None
args.version = self.version
args.data_source = self.data_source

return args
26 changes: 23 additions & 3 deletions src/claude_monitor/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
"""Data package for Claude Monitor."""
"""Data package for Claude Monitor.

# Import directly from modules without facade
__all__: list[str] = []
Provides data loading from multiple sources:
- Claude Code (~/.claude/projects/*.jsonl)
- OpenCode (~/.local/share/opencode/storage/message/*.json)
"""

from claude_monitor.data.reader import (
DataSource,
detect_available_sources,
detect_data_source,
get_data_source_info,
load_usage_entries,
load_usage_entries_unified,
)

__all__ = [
"DataSource",
"detect_available_sources",
"detect_data_source",
"get_data_source_info",
"load_usage_entries",
"load_usage_entries_unified",
]
27 changes: 23 additions & 4 deletions src/claude_monitor/data/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,24 @@ class UsageAggregator:
"""Aggregates usage data for daily and monthly reports."""

def __init__(
self, data_path: str, aggregation_mode: str = "daily", timezone: str = "UTC"
self,
data_path: str,
aggregation_mode: str = "daily",
timezone: str = "UTC",
data_source: str = "auto",
):
"""Initialize the aggregator.

Args:
data_path: Path to the data directory
aggregation_mode: Mode of aggregation ('daily' or 'monthly')
timezone: Timezone string for date formatting
data_source: Data source to use ("auto", "claude", "opencode")
"""
self.data_path = data_path
self.aggregation_mode = aggregation_mode
self.timezone = timezone
self.data_source = data_source
self.timezone_handler = TimezoneHandler()

def _aggregate_by_period(
Expand Down Expand Up @@ -272,12 +278,25 @@ def aggregate(self) -> List[Dict[str, Any]]:
Returns:
List of aggregated data based on aggregation_mode
"""
from claude_monitor.data.reader import load_usage_entries
from claude_monitor.data.reader import DataSource, load_usage_entries_unified

logger.info(f"Starting aggregation in {self.aggregation_mode} mode")

# Load usage entries
entries, _ = load_usage_entries(data_path=self.data_path)
# Convert string source to DataSource enum
source_map = {
"auto": DataSource.AUTO,
"all": DataSource.ALL,
"claude": DataSource.CLAUDE,
"opencode": DataSource.OPENCODE,
}
source_enum = source_map.get(self.data_source.lower(), DataSource.AUTO)

# Load usage entries from all available sources
entries, _, detected_source = load_usage_entries_unified(
data_path=self.data_path,
source=source_enum,
)
logger.info(f"Loaded {len(entries)} entries from {detected_source.value}")

if not entries:
logger.warning("No usage entries found")
Expand Down
24 changes: 20 additions & 4 deletions src/claude_monitor/data/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from claude_monitor.core.calculations import BurnRateCalculator
from claude_monitor.core.models import CostMode, SessionBlock, UsageEntry
from claude_monitor.data.analyzer import SessionAnalyzer
from claude_monitor.data.reader import load_usage_entries
from claude_monitor.data.reader import DataSource, load_usage_entries_unified

logger = logging.getLogger(__name__)

Expand All @@ -20,6 +20,7 @@ def analyze_usage(
use_cache: bool = True,
quick_start: bool = False,
data_path: Optional[str] = None,
data_source: str = "auto",
) -> Dict[str, Any]:
"""
Main entry point to generate response_final.json.
Expand All @@ -35,13 +36,14 @@ def analyze_usage(
use_cache: Use cached data when available
quick_start: Use minimal data for quick startup (last 24h only)
data_path: Optional path to Claude data directory
data_source: Data source to use ("auto", "claude", "opencode")

Returns:
Dictionary with analyzed blocks
"""
logger.info(
f"analyze_usage called with hours_back={hours_back}, use_cache={use_cache}, "
f"quick_start={quick_start}, data_path={data_path}"
f"quick_start={quick_start}, data_path={data_path}, data_source={data_source}"
)

if quick_start and hours_back is None:
Expand All @@ -50,15 +52,28 @@ def analyze_usage(
elif quick_start:
logger.info(f"Quick start mode: loading last {hours_back} hours")

# Convert string source to DataSource enum
source_map = {
"auto": DataSource.AUTO,
"all": DataSource.ALL,
"claude": DataSource.CLAUDE,
"opencode": DataSource.OPENCODE,
}
source_enum = source_map.get(data_source.lower(), DataSource.AUTO)

start_time = datetime.now()
entries, raw_entries = load_usage_entries(
entries, raw_entries, detected_source = load_usage_entries_unified(
data_path=data_path,
hours_back=hours_back,
mode=CostMode.AUTO,
include_raw=True,
source=source_enum,
)
load_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Data loaded in {load_time:.3f}s")
logger.info(
f"Data loaded in {load_time:.3f}s from {detected_source.value} "
f"({len(entries)} entries)"
)

start_time = datetime.now()
analyzer = SessionAnalyzer(session_duration_hours=5)
Expand Down Expand Up @@ -93,6 +108,7 @@ def analyze_usage(
"transform_time_seconds": transform_time,
"cache_used": use_cache,
"quick_start": quick_start,
"data_source": detected_source.value,
}

result = _create_result(blocks, entries, metadata)
Expand Down
Loading