Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion codeframe/persistence/repositories/token_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Extracted from monolithic Database class for better maintainability.
"""

from datetime import datetime
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Dict, Any, TYPE_CHECKING
import logging

Expand Down Expand Up @@ -274,6 +274,83 @@ def get_workspace_token_usage(
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]

def get_costs_summary(self, days: int) -> Dict[str, Any]:
"""Aggregate token_usage costs into daily buckets for analytics.

Args:
days: Number of trailing days to include in the summary.

Returns:
Dictionary with keys:
total_spend_usd: float — sum of estimated_cost_usd in window
total_tasks: int — distinct task_id count (excludes NULL)
avg_cost_per_task: float — total_spend_usd / total_tasks (0 if no tasks)
daily: list of {"date": "YYYY-MM-DD", "cost_usd": float}
— one entry per day in the window, oldest first,
zero-filled for days with no spend.
"""
if days <= 0:
raise ValueError("days must be a positive integer")

now_utc = datetime.now(timezone.utc)
# Inclusive window starting at midnight UTC, `days` calendar days back.
# Use a space-separated, offset-free format so lexicographic comparison
# works against both `CURRENT_TIMESTAMP` defaults ("YYYY-MM-DD HH:MM:SS")
# and Python `.isoformat()` outputs ("YYYY-MM-DDTHH:MM:SS+00:00").
end_date = now_utc.date()
start_date = end_date - timedelta(days=days - 1)
start_iso = start_date.strftime("%Y-%m-%d %H:%M:%S")
# Exclusive upper bound = midnight after today, so the daily chart and
# the KPI cards always cover the same set of rows even if some records
# are future-dated (clock skew, bad seed data).
end_iso = (end_date + timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")

cursor = self.conn.cursor()

# Totals over the window. total_spend includes NULL-task records so it
# matches the chart; total_tasks only counts records linked to a task.
cursor.execute(
"""
SELECT
COALESCE(SUM(estimated_cost_usd), 0.0) AS total_spend,
COUNT(DISTINCT CASE WHEN task_id IS NOT NULL THEN task_id END) AS task_count
FROM token_usage
WHERE timestamp >= ? AND timestamp < ?
""",
Comment thread
coderabbitai[bot] marked this conversation as resolved.
(start_iso, end_iso),
)
totals = cursor.fetchone()
total_spend = float(totals["total_spend"] or 0.0)
total_tasks = int(totals["task_count"] or 0)
avg_cost = (total_spend / total_tasks) if total_tasks > 0 else 0.0

# Daily aggregation — group by calendar date in UTC
cursor.execute(
"""
SELECT
DATE(timestamp) AS day,
COALESCE(SUM(estimated_cost_usd), 0.0) AS cost
FROM token_usage
WHERE timestamp >= ? AND timestamp < ?
GROUP BY DATE(timestamp)
""",
(start_iso, end_iso),
)
by_day: Dict[str, float] = {row["day"]: float(row["cost"] or 0.0) for row in cursor.fetchall()}

daily: List[Dict[str, Any]] = []
for offset in range(days):
d = start_date + timedelta(days=offset)
iso = d.isoformat()
daily.append({"date": iso, "cost_usd": by_day.get(iso, 0.0)})

return {
"total_spend_usd": total_spend,
"total_tasks": total_tasks,
"avg_cost_per_task": avg_cost,
"daily": daily,
}

def get_project_costs_aggregate(self, project_id: int) -> Dict[str, Any]:
"""Get aggregated cost statistics for a project.

Expand Down
130 changes: 130 additions & 0 deletions codeframe/ui/routers/costs_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Cost analytics API router for CodeFRAME v2 (issue #557).

Aggregates the workspace's `token_usage` table into a daily-bucket summary
for the /costs page in the web UI. Hosts a single endpoint:

GET /api/v2/costs/summary?days=30

Returns an empty-state payload (all zeros, zero-filled daily series) when
no spend data exists or the table isn't present — never 404.

The handler opens the workspace SQLite database directly to avoid the
pre-existing schema conflict between `codeframe/core/workspace.py` and
`codeframe/persistence/schema_manager.py` — wiring `TokenRepository`
to a raw connection skips `Database.initialize()` entirely.
"""

import logging
import sqlite3
from datetime import datetime, timedelta, timezone
from typing import Dict, List

from fastapi import APIRouter, Depends, Query, Request
from pydantic import BaseModel

from codeframe.core.workspace import Workspace
from codeframe.lib.rate_limiter import rate_limit_standard
from codeframe.persistence.repositories.token_repository import TokenRepository
from codeframe.ui.dependencies import get_v2_workspace

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/api/v2/costs", tags=["metrics"])


class DailyCostPoint(BaseModel):
"""One day of aggregated spend."""

date: str # ISO format YYYY-MM-DD
cost_usd: float


class CostSummaryResponse(BaseModel):
"""Aggregated spend over the requested window."""

total_spend_usd: float
total_tasks: int
avg_cost_per_task: float
daily: List[DailyCostPoint]


def _empty_summary(days: int) -> Dict:
"""Build a zero-state response with `days` daily buckets."""
end_date = datetime.now(timezone.utc).date()
start_date = end_date - timedelta(days=days - 1)
daily = [
{"date": (start_date + timedelta(days=i)).isoformat(), "cost_usd": 0.0}
for i in range(days)
]
return {
"total_spend_usd": 0.0,
"total_tasks": 0,
"avg_cost_per_task": 0.0,
"daily": daily,
}


def _query_costs(db_path: str, days: int) -> Dict:
"""Query the workspace DB via TokenRepository on a raw connection.

Returns an empty summary if the DB can't be opened or the table is missing,
rather than raising — keeps the endpoint safe for fresh workspaces.

TODO(schema-conflict): we open the connection directly rather than through
`Database(...).initialize()` because the v2 workspace schema in
`codeframe/core/workspace.py` and the global schema in
`persistence/schema_manager.py` define `blockers` incompatibly, and
`Database.initialize()` therefore crashes on existing workspace DBs.
Remove this workaround once the two schemas converge.
"""
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
except sqlite3.Error as e:
logger.warning("costs: failed to open %s: %s", db_path, e)
return _empty_summary(days)
Comment on lines +80 to +85
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid creating a database file on a read-only GET.

sqlite3.connect(db_path) will create the SQLite file when it is missing, so this endpoint can mutate workspace state and silently mask a bad workspace.db_path. Open read-only (or at least short-circuit when the file is absent) before falling back to _empty_summary(days).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@codeframe/ui/routers/costs_v2.py` around lines 70 - 74, The current code
calls sqlite3.connect(db_path) which will create the DB file on a GET; instead
either short-circuit if the file is missing or open the DB read-only to avoid
mutation. Modify the block around sqlite3.connect/db_path to first check for
existence (e.g., os.path.exists(db_path) and return _empty_summary(days) if
absent) or open with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) and
handle sqlite3.OperationalError to return _empty_summary(days); keep the
existing conn.row_factory = sqlite3.Row and callers that use conn unchanged.


Comment thread
coderabbitai[bot] marked this conversation as resolved.
try:
try:
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='token_usage'"
)
if cursor.fetchone() is None:
return _empty_summary(days)

repo = TokenRepository(sync_conn=conn)
return repo.get_costs_summary(days)
except sqlite3.Error as e:
# Locked DB, corrupted schema, etc. — fall back to empty state
# rather than 500'ing the dashboard.
logger.warning("costs: query failed on %s: %s", db_path, e)
return _empty_summary(days)
finally:
conn.close()


@router.get("/summary", response_model=CostSummaryResponse)
@rate_limit_standard()
async def get_costs_summary(
request: Request,
workspace: Workspace = Depends(get_v2_workspace),
days: int = Query(30, ge=7, le=90, description="Window size in days (7-90)"),
):
"""Return total spend, task count, average cost, and a daily series.

Reads from the workspace's `token_usage` table. Returns zero-filled
daily buckets so the client can render a chart without conditionals.
If the table doesn't exist (no agent has run in this workspace yet),
returns an empty-state response rather than an error.
"""
summary = _query_costs(str(workspace.db_path), days)
Comment on lines +109 to +121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "costs_v2.py" | head -20

Repository: frankbria/codeframe

Length of output: 98


🏁 Script executed:

cat -n codeframe/ui/routers/costs_v2.py | head -120

Repository: frankbria/codeframe

Length of output: 4611


🏁 Script executed:

# Check the _query_costs function definition
rg "_query_costs" -B 3 -A 15

Repository: frankbria/codeframe

Length of output: 2233


🏁 Script executed:

find . -type f -name "token_repository.py" -o -name "*repository.py" | grep -E "token_repository|repositories"

Repository: frankbria/codeframe

Length of output: 1208


🏁 Script executed:

# Check TokenRepository.get_costs_summary to see if it's blocking
rg "get_costs_summary" -B 2 -A 20

Repository: frankbria/codeframe

Length of output: 16330


🏁 Script executed:

# Check how other async routers handle blocking operations
rg "async def" codeframe/ui/routers/ -A 10 | head -100

Repository: frankbria/codeframe

Length of output: 6740


🏁 Script executed:

# Search for threadpool/to_thread usage in the codebase to see the pattern
rg "to_thread|run_in_threadpool|ThreadPoolExecutor" -B 2 -A 2

Repository: frankbria/codeframe

Length of output: 14276


🏁 Script executed:

# Check TokenRepository.get_costs_summary implementation
cat -n codeframe/persistence/repositories/token_repository.py | grep -A 60 "def get_costs_summary"

Repository: frankbria/codeframe

Length of output: 3096


🏁 Script executed:

# Check if there's any pattern preference - look at which is more common
rg "async def.*\(.*Depends|Query" codeframe/ui/routers/*.py | wc -l
rg "def .*\(.*Depends|Query" codeframe/ui/routers/*.py | wc -l

Repository: frankbria/codeframe

Length of output: 69


Do not run synchronous SQLite work on the event loop.

This async handler calls _query_costs(), which performs blocking sqlite3 operations (via TokenRepository.get_costs_summary()). These blocking calls will run on the event loop and can stall unrelated requests under lock contention or in larger workspaces. Make the route synchronous or wrap _query_costs() with asyncio.to_thread() to offload to a threadpool.

The codebase has established a consistent pattern for this — see settings_v2.py (uses run_in_threadpool), discovery.py, and chat.py (use asyncio.to_thread) for examples.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@codeframe/ui/routers/costs_v2.py` around lines 92 - 104, The async handler
get_costs_summary is calling the blocking _query_costs (which uses
TokenRepository.get_costs_summary) on the event loop; offload that DB work to a
threadpool to avoid blocking. Replace the direct call to _query_costs with an
awaited call using asyncio.to_thread(_query_costs, str(workspace.db_path), days)
(or make the route synchronous) so sqlite3 operations run off the event loop;
update any callers/returns accordingly to keep the same response shape.

return CostSummaryResponse(
total_spend_usd=summary["total_spend_usd"],
total_tasks=summary["total_tasks"],
avg_cost_per_task=summary["avg_cost_per_task"],
daily=[
DailyCostPoint(date=d["date"], cost_usd=d["cost_usd"])
for d in summary["daily"]
],
)
2 changes: 2 additions & 0 deletions codeframe/ui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
batches_v2,
blockers_v2,
checkpoints_v2,
costs_v2,
diagnose_v2,
discovery_v2,
environment_v2,
Expand Down Expand Up @@ -483,6 +484,7 @@ async def test_broadcast(message: dict, project_id: int = None):
app.include_router(batches_v2.router) # /api/v2/batches
app.include_router(blockers_v2.router) # /api/v2/blockers
app.include_router(checkpoints_v2.router) # /api/v2/checkpoints
app.include_router(costs_v2.router) # /api/v2/costs
app.include_router(diagnose_v2.router) # /api/v2/tasks/{id}/diagnose
app.include_router(discovery_v2.router) # /api/v2/discovery
app.include_router(environment_v2.router) # /api/v2/env
Expand Down
Loading
Loading