Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 121 additions & 6 deletions Gradata/src/gradata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def cmd_status(args):
import time as _time
import urllib.error as _urllib_error
import urllib.request as _urllib_request
from datetime import datetime, timezone
from datetime import datetime

brain = _get_brain(args)
stats = brain.stats()
Expand Down Expand Up @@ -301,6 +301,116 @@ def cmd_status(args):
print(" (events schema not available)")


def cmd_projects(args):
"""List every project the SDK knows about from ``~/.gradata/projects.toml``.

Columns:
* ``name`` — project identifier from the registry
* ``brain_dir`` — path to the brain directory on disk
* ``rules`` — count of RULE_GRADUATED events in ``system.db``
(0 if the brain is fresh or the db is unreadable)
* ``last_correction`` — MAX(ts) of CORRECTION events, or ``(never)``
* ``sync_status`` — ``ok`` if brain_dir exists, ``missing`` if not,
``error`` if the db is present but unreadable

The registry schema is:

[[projects]]
name = "my-project"
brain_dir = "/path/to/.gradata/brain"

Missing registry file → friendly message + exit 0 (NOT a crash). Malformed
TOML → friendly error + exit 1.
"""
import json as _json
import sqlite3 as _sqlite3
import tomllib as _tomllib
from pathlib import Path as _Path

registry_path = _Path.home() / ".gradata" / "projects.toml"
as_json = getattr(args, "json", False)

if not registry_path.is_file():
if as_json:
print("[]")
else:
print("No projects registered. Run `gradata init <dir>` to add one.")
return

try:
with registry_path.open("rb") as fh:
data = _tomllib.load(fh)
except _tomllib.TOMLDecodeError as exc:
print(f"Error: malformed projects.toml ({exc})")
raise SystemExit(1) from None

raw_projects = data.get("projects", []) or []
rows = []
for proj in raw_projects:
name = proj.get("name", "(unnamed)")
brain_dir = proj.get("brain_dir", "")
Comment on lines +347 to +351
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate registry entry shape before dereferencing project fields.

raw_projects items are assumed to be dicts. A schema-invalid but parseable TOML value (e.g. projects = ["x"]) crashes with AttributeError at Line 350 instead of returning a friendly malformed-registry error.

Proposed fix
     raw_projects = data.get("projects", []) or []
     rows = []
     for proj in raw_projects:
+        if not isinstance(proj, dict):
+            print("Error: malformed projects.toml (each [[projects]] entry must be a table)")
+            raise SystemExit(1) from None
         name = proj.get("name", "(unnamed)")
         brain_dir = proj.get("brain_dir", "")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raw_projects = data.get("projects", []) or []
rows = []
for proj in raw_projects:
name = proj.get("name", "(unnamed)")
brain_dir = proj.get("brain_dir", "")
raw_projects = data.get("projects", []) or []
rows = []
for proj in raw_projects:
if not isinstance(proj, dict):
print("Error: malformed projects.toml (each [[projects]] entry must be a table)")
raise SystemExit(1) from None
name = proj.get("name", "(unnamed)")
brain_dir = proj.get("brain_dir", "")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/cli.py` around lines 347 - 351, Validate each item from
raw_projects before treating it as a dict: in the loop that iterates over
raw_projects (the for proj in raw_projects block where name = proj.get(...) and
brain_dir = proj.get(...)), check isinstance(proj, dict) and if not, raise or
return a clear malformed-registry error (or skip with a logged warning)
indicating the project entry has an unexpected type; ensure the error message
names the offending entry and/or index so users can fix their registry instead
of allowing an AttributeError to propagate.

rules_count = 0
last_correction = None
sync_status = "ok"

brain_path = _Path(brain_dir) if brain_dir else None
if not brain_path or not brain_path.is_dir():
sync_status = "missing"
else:
db_path = brain_path / "system.db"
if not db_path.is_file():
# Brain dir exists but no db yet — treat as fresh, not error.
sync_status = "ok"
else:
try:
con = _sqlite3.connect(str(db_path))
cur = con.cursor()
rules_count = cur.execute(
"SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
).fetchone()[0]
row = cur.execute(
"SELECT MAX(ts) FROM events WHERE type='CORRECTION'"
).fetchone()
last_correction = row[0] if row else None
con.close()
except (_sqlite3.OperationalError, _sqlite3.DatabaseError, OSError):
sync_status = "error"
Comment on lines +365 to +377
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Close SQLite connections on query failures.

If any query fails after connect() succeeds, execution jumps to except and con.close() is skipped. Repeated errors across many projects can leak file descriptors/locks.

Proposed fix
             else:
                 try:
-                    con = _sqlite3.connect(str(db_path))
-                    cur = con.cursor()
-                    rules_count = cur.execute(
-                        "SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
-                    ).fetchone()[0]
-                    row = cur.execute(
-                        "SELECT MAX(ts) FROM events WHERE type='CORRECTION'"
-                    ).fetchone()
-                    last_correction = row[0] if row else None
-                    con.close()
+                    with _sqlite3.connect(str(db_path)) as con:
+                        cur = con.cursor()
+                        rules_count = cur.execute(
+                            "SELECT COUNT(*) FROM events WHERE type='RULE_GRADUATED'"
+                        ).fetchone()[0]
+                        row = cur.execute(
+                            "SELECT MAX(ts) FROM events WHERE type='CORRECTION'"
+                        ).fetchone()
+                        last_correction = row[0] if row else None
                 except (_sqlite3.OperationalError, _sqlite3.DatabaseError, OSError):
                     sync_status = "error"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/cli.py` around lines 365 - 377, The code currently opens
a SQLite connection with _sqlite3.connect(...) and calls con.close() only in the
try path, so if a query raises an exception the connection is leaked; modify the
block around con, cur, rules_count and last_correction so the connection is
always closed — either use a context manager (with
_sqlite3.connect(str(db_path)) as con:) or ensure con.close() is called in a
finally block and guard against con being undefined if connect() fails; update
the code that uses cur.execute(...) and the except clause to preserve existing
error handling while guaranteeing con.close() runs.


rows.append(
{
"name": name,
"brain_dir": brain_dir,
"rules": rules_count,
"last_correction": last_correction,
"sync_status": sync_status,
}
)

if as_json:
print(_json.dumps(rows, indent=2))
return

if not rows:
print("No projects registered. Run `gradata init <dir>` to add one.")
return

headers = ("NAME", "BRAIN_DIR", "RULES", "LAST_CORRECTION", "SYNC_STATUS")
table: list[tuple[str, str, str, str, str]] = [headers]
for r in rows:
table.append(
(
str(r["name"]),
str(r["brain_dir"]),
str(r["rules"]),
str(r["last_correction"] or "(never)"),
str(r["sync_status"]),
)
)
widths = [max(len(row[i]) for row in table) for i in range(len(headers))]
for row in table:
print(" ".join(cell.ljust(widths[i]) for i, cell in enumerate(row)))


def cmd_audit(args):
from gradata._audit import format_audit_text, run_audit

Expand Down Expand Up @@ -783,20 +893,20 @@ def cmd_prove(args):
den = sum((x - mean_x) ** 2 for x in xs) or 1.0
slope = num / den

print(f"Corrections per session:")
print("Corrections per session:")
print(f" Sessions: {n}")
print(f" Total corrections: {sum(counts)}")
print(f" Mean: {mean_y:.1f}/session")
if n >= 3:
print(f" Trend slope: {slope:+.3f} corrections/session")
if slope < -0.05:
print(f" Verdict: CONVERGING (brain is learning — fewer corrections over time)")
print(" Verdict: CONVERGING (brain is learning — fewer corrections over time)")
elif slope > 0.05:
print(f" Verdict: DIVERGING (corrections rising — brain may need tuning)")
print(" Verdict: DIVERGING (corrections rising — brain may need tuning)")
else:
print(f" Verdict: STABLE (flat trend)")
print(" Verdict: STABLE (flat trend)")
else:
print(f" Trend: need >=3 sessions to estimate")
print(" Trend: need >=3 sessions to estimate")

# Rule application rate
total_apps = sum(rule_apps_by_session.values())
Expand Down Expand Up @@ -1867,6 +1977,10 @@ def main():
# status (umbrella health check: stats + daemon + cloud + convergence)
sub.add_parser("status", help="Single-page brain/daemon/cloud summary")

# projects (registry listing from ~/.gradata/projects.toml)
p_projects = sub.add_parser("projects", help="List registered projects")
p_projects.add_argument("--json", action="store_true", help="Emit JSON array")

# audit
p_audit = sub.add_parser("audit", help="Data flow audit")
p_audit.add_argument("--json", action="store_true")
Expand Down Expand Up @@ -2206,6 +2320,7 @@ def main():
"manifest": cmd_manifest,
"stats": cmd_stats,
"status": cmd_status,
"projects": cmd_projects,
"audit": cmd_audit,
"sync": cmd_sync,
"recall": cmd_recall,
Expand Down
179 changes: 179 additions & 0 deletions Gradata/tests/test_projects_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Tests for the ``gradata projects`` subcommand (GRA-1238).

Covers:
* missing registry → friendly message, exit 0
* empty registry → friendly message, exit 0
* single project (brain_dir missing on disk → sync_status='missing')
* multi-project registry
* malformed TOML → SystemExit(1) with error message
* --json output shape
* missing brain_dir → sync_status='missing' (explicit case)
* brain_dir present + no system.db → sync_status='ok', rules=0
"""

from __future__ import annotations

import json
from pathlib import Path
from unittest.mock import patch

import pytest

from gradata.cli import cmd_projects


class _Args:
"""Minimal stand-in for argparse.Namespace."""

def __init__(self, *, json: bool = False) -> None:
self.json = json


def _patch_home(tmp_path: Path):
"""Return a patcher pinning ``Path.home()`` to tmp_path for cmd_projects."""
return patch("pathlib.Path.home", return_value=tmp_path)


def _write_registry(home: Path, contents: str) -> None:
cfg_dir = home / ".gradata"
cfg_dir.mkdir(parents=True, exist_ok=True)
(cfg_dir / "projects.toml").write_text(contents, encoding="utf-8")


def test_missing_registry_prints_hint_and_exits_zero(tmp_path, capsys):
# No projects.toml at all → friendly message, no crash, exit 0 (function returns).
with _patch_home(tmp_path):
cmd_projects(_Args())
out = capsys.readouterr().out
assert "No projects registered" in out
assert "gradata init" in out


def test_missing_registry_json_returns_empty_array(tmp_path, capsys):
with _patch_home(tmp_path):
cmd_projects(_Args(json=True))
out = capsys.readouterr().out.strip()
assert json.loads(out) == []


def test_empty_registry_prints_hint(tmp_path, capsys):
_write_registry(tmp_path, "") # valid TOML, no [[projects]]
with _patch_home(tmp_path):
cmd_projects(_Args())
out = capsys.readouterr().out
assert "No projects registered" in out


def test_single_project_missing_brain_dir(tmp_path, capsys):
_write_registry(
tmp_path,
'[[projects]]\nname = "alpha"\nbrain_dir = "/nonexistent/path/brain"\n',
)
with _patch_home(tmp_path):
cmd_projects(_Args())
out = capsys.readouterr().out
# Header + one row.
assert "NAME" in out
assert "alpha" in out
assert "missing" in out
assert "(never)" in out


def test_single_project_brain_dir_exists_no_db(tmp_path, capsys):
brain_dir = tmp_path / "my-brain"
brain_dir.mkdir()
_write_registry(
tmp_path,
f'[[projects]]\nname = "alpha"\nbrain_dir = "{brain_dir.as_posix()}"\n',
)
with _patch_home(tmp_path):
cmd_projects(_Args(json=True))
payload = json.loads(capsys.readouterr().out)
assert len(payload) == 1
row = payload[0]
assert row["name"] == "alpha"
# Compare via Path normalization so Windows '/' vs '\\' doesn't break the test
assert Path(row["brain_dir"]) == Path(brain_dir.as_posix())
assert row["rules"] == 0
assert row["last_correction"] is None
assert row["sync_status"] == "ok"


def test_multi_project_registry(tmp_path, capsys):
other_dir = tmp_path / "other-brain"
other_dir.mkdir()
_write_registry(
tmp_path,
f"""
[[projects]]
name = "alpha"
brain_dir = "/nope/alpha"

[[projects]]
name = "beta"
brain_dir = "{other_dir.as_posix()}"
""",
)
with _patch_home(tmp_path):
cmd_projects(_Args(json=True))
payload = json.loads(capsys.readouterr().out)
assert len(payload) == 2
names = {row["name"] for row in payload}
assert names == {"alpha", "beta"}
by_name = {row["name"]: row for row in payload}
assert by_name["alpha"]["sync_status"] == "missing"
assert by_name["beta"]["sync_status"] == "ok"


def test_malformed_toml_exits_one(tmp_path, capsys):
_write_registry(tmp_path, "this = is = broken =\n[[[\n")
with _patch_home(tmp_path), pytest.raises(SystemExit) as excinfo:
cmd_projects(_Args())
assert excinfo.value.code == 1
out = capsys.readouterr().out
assert "malformed" in out.lower() or "error" in out.lower()


def test_json_output_shape(tmp_path, capsys):
_write_registry(
tmp_path,
'[[projects]]\nname = "alpha"\nbrain_dir = "/nope/alpha"\n',
)
with _patch_home(tmp_path):
cmd_projects(_Args(json=True))
payload = json.loads(capsys.readouterr().out)
assert isinstance(payload, list)
assert payload[0].keys() == {
"name",
"brain_dir",
"rules",
"last_correction",
"sync_status",
}


def test_real_db_with_rule_graduated_event(tmp_path, capsys):
"""Real sqlite db with an events table → rules count is read."""
import sqlite3

brain_dir = tmp_path / "brain-x"
brain_dir.mkdir()
db = brain_dir / "system.db"
con = sqlite3.connect(str(db))
con.execute("CREATE TABLE events (type TEXT, ts TEXT)")
con.execute("INSERT INTO events (type, ts) VALUES ('RULE_GRADUATED', '2025-01-01')")
con.execute("INSERT INTO events (type, ts) VALUES ('RULE_GRADUATED', '2025-01-02')")
con.execute("INSERT INTO events (type, ts) VALUES ('CORRECTION', '2025-05-10T12:00:00Z')")
con.commit()
con.close()

_write_registry(
tmp_path,
f'[[projects]]\nname = "x"\nbrain_dir = "{brain_dir.as_posix()}"\n',
)
with _patch_home(tmp_path):
cmd_projects(_Args(json=True))
payload = json.loads(capsys.readouterr().out)
assert payload[0]["rules"] == 2
assert payload[0]["last_correction"] == "2025-05-10T12:00:00Z"
assert payload[0]["sync_status"] == "ok"
Loading