Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ Gradata/scripts/*
!Gradata/scripts/publish-npm.sh
!Gradata/scripts/cloud/
!Gradata/scripts/migrate_legacy_scopes.py
!Gradata/scripts/weekly_correction_snapshot.py

# npm sub-package build outputs (source tracked, outputs ignored)
Gradata/packages/npm/node_modules/
Expand Down
36 changes: 36 additions & 0 deletions Gradata/docs/weekly-correction-snapshot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Weekly Correction Snapshot

`scripts/weekly_correction_snapshot.py` builds a deterministic JSON summary from newline-delimited JSON (NDJSON) events. This is intended for weekly correction-outcome trend reporting.

## Usage

From file:

```bash
python scripts/weekly_correction_snapshot.py --input /path/to/events.jsonl
```

From stdin:

```bash
cat /path/to/events.jsonl | python scripts/weekly_correction_snapshot.py
```

## Output schema

The script always emits one compact JSON object with stable key ordering:

- `total_corrections` (int): count of correction events (`event=correction.created` or `kind=correction`)
- `accepted_graduations` (int): count of accepted graduation outcomes
- `rejection_count` (int): count of rejected graduation outcomes
- `acceptance_rate` (float): `accepted_graduations / (accepted_graduations + rejection_count)`, rounded to 6 decimals, or `0.0` if denominator is zero
- `top_rule_categories` (list): up to 5 entries sorted by descending count, then category name
- `skipped_rows` (int): malformed or non-object rows ignored during parsing

`top_rule_categories` entries use:

```json
{"category":"tone","count":12}
```

Category normalization is lowercase + trimmed whitespace. Empty/missing categories normalize to `"unknown"`.
2 changes: 1 addition & 1 deletion Gradata/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ skips = [
# --- Pytest ---
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
pythonpath = ["src", "scripts"]
markers = [
"integration: tests that hit external LLM APIs (cost money, skip in CI)",
"dualwrite: dual-write crash recovery and reconciliation tests",
Expand Down
130 changes: 130 additions & 0 deletions Gradata/scripts/weekly_correction_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""Compute weekly correction/graduation aggregates from NDJSON events."""

from __future__ import annotations

import argparse
import json
import sys
from collections import Counter
from typing import Any


def _normalize_category(value: Any) -> str:
if value is None:
return "unknown"
normalized = str(value).strip().lower()
return normalized or "unknown"


def _is_correction(row: dict[str, Any]) -> bool:
event = str(row.get("event", "")).strip().lower()
kind = str(row.get("kind", "")).strip().lower()
return event == "correction.created" or kind == "correction"


def _is_graduation_accepted(row: dict[str, Any]) -> bool:
event = str(row.get("event", "")).strip().lower()
outcome = str(row.get("outcome", "")).strip().lower()
accepted_flag = row.get("accepted")
status = str(row.get("status", "")).strip().lower()
return (
event in {"lesson.graduated", "graduation.accepted"}
or outcome == "accepted"
or accepted_flag is True
or status in {"accepted", "graduated"}
)


def _is_rejection(row: dict[str, Any]) -> bool:
event = str(row.get("event", "")).strip().lower()
outcome = str(row.get("outcome", "")).strip().lower()
accepted_flag = row.get("accepted")
status = str(row.get("status", "")).strip().lower()
return (
event in {"graduation.rejected", "lesson.rejected"}
or outcome == "rejected"
or accepted_flag is False
or status == "rejected"
)


def parse_rows(lines: list[str]) -> tuple[list[dict[str, Any]], int]:
rows: list[dict[str, Any]] = []
skipped = 0
for raw in lines:
line = raw.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
skipped += 1
continue
if not isinstance(row, dict):
skipped += 1
continue
rows.append(row)
return rows, skipped


def aggregate(rows: list[dict[str, Any]]) -> dict[str, Any]:
total_corrections = 0
accepted_graduations = 0
rejection_count = 0
categories: Counter[str] = Counter()

for row in rows:
if _is_correction(row):
total_corrections += 1
categories[_normalize_category(row.get("category"))] += 1
is_accepted = _is_graduation_accepted(row)
is_rejected = _is_rejection(row)
if is_accepted and not is_rejected:
accepted_graduations += 1
elif is_rejected and not is_accepted:
rejection_count += 1

denominator = accepted_graduations + rejection_count
acceptance_rate = round(accepted_graduations / denominator, 6) if denominator else 0.0

top_categories = [
{"category": name, "count": count}
for name, count in sorted(categories.items(), key=lambda item: (-item[1], item[0]))[:5]
]

return {
"total_corrections": total_corrections,
"accepted_graduations": accepted_graduations,
"rejection_count": rejection_count,
"acceptance_rate": acceptance_rate,
"top_rule_categories": top_categories,
}


def _read_lines(path: str | None) -> list[str]:
if path:
with open(path, encoding="utf-8") as handle:
return handle.readlines()
return sys.stdin.readlines()


def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Compute correction-outcome aggregates for weekly trend snapshots."
)
parser.add_argument("--input", help="Path to newline-delimited JSON input file")
args = parser.parse_args(argv)

lines = _read_lines(args.input)
rows, skipped_rows = parse_rows(lines)
snapshot = aggregate(rows)
snapshot["skipped_rows"] = skipped_rows

json.dump(snapshot, sys.stdout, sort_keys=True, separators=(",", ":"))
sys.stdout.write("\n")
return 0


if __name__ == "__main__":
raise SystemExit(main())
4 changes: 4 additions & 0 deletions Gradata/tests/test_byo_key_provider.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import annotations

import pytest

from gradata.llm.byo_key import BYOKeyProvider

pytest.importorskip("httpx")


class _Response:
def __init__(self, payload: dict):
Expand Down
112 changes: 112 additions & 0 deletions Gradata/tests/test_weekly_correction_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import json

from scripts import weekly_correction_snapshot as snapshot


def test_parse_rows_skips_malformed_and_non_object_rows():
rows, skipped = snapshot.parse_rows(
[
'{"event":"correction.created","category":"tone"}',
"not-json",
'["array-row"]',
"",
" ",
]
)
assert skipped == 2
assert len(rows) == 1


def test_aggregate_empty_input_has_zero_division_safe_defaults():
data = snapshot.aggregate([])
assert data["total_corrections"] == 0
assert data["accepted_graduations"] == 0
assert data["rejection_count"] == 0
assert data["acceptance_rate"] == 0.0
assert data["top_rule_categories"] == []


def test_aggregate_counts_and_top_categories_deterministically():
rows = [
{"event": "correction.created", "category": "Tone"},
{"event": "correction.created", "category": "tone"},
{"event": "correction.created", "category": "factual"},
{"event": "correction.created", "category": " PROCESS "},
{"kind": "correction", "category": ""},
{"event": "lesson.graduated"},
{"event": "graduation.accepted"},
{"outcome": "accepted"},
{"event": "graduation.rejected"},
{"accepted": False},
]
data = snapshot.aggregate(rows)
assert data["total_corrections"] == 5
assert data["accepted_graduations"] == 3
assert data["rejection_count"] == 2
assert data["acceptance_rate"] == 0.6
assert data["top_rule_categories"] == [
{"category": "tone", "count": 2},
{"category": "factual", "count": 1},
{"category": "process", "count": 1},
{"category": "unknown", "count": 1},
]


def test_main_emits_deterministic_json_with_skipped_rows(capsys, monkeypatch):
payload = (
'{"event":"correction.created","category":"tone"}\n'
'{"event":"lesson.graduated"}\n'
'{"event":"graduation.rejected"}\n'
"bad-row\n"
)
monkeypatch.setattr("sys.stdin.readlines", lambda: payload.splitlines(keepends=True))
rc = snapshot.main([])
assert rc == 0
out = capsys.readouterr().out
result = json.loads(out)
assert result == {
"acceptance_rate": 0.5,
"accepted_graduations": 1,
"rejection_count": 1,
"skipped_rows": 1,
"top_rule_categories": [{"category": "tone", "count": 1}],
"total_corrections": 1,
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.

def test_main_emits_deterministic_json_with_input_file(capsys, tmp_path):
payload = (
'{"event":"correction.created","category":"tone"}\n'
'{"event":"lesson.graduated"}\n'
'{"event":"graduation.rejected"}\n'
"bad-row\n"
)
input_file = tmp_path / "events.ndjson"
input_file.write_text(payload, encoding="utf-8")

rc = snapshot.main(["--input", str(input_file)])
assert rc == 0
out = capsys.readouterr().out
result = json.loads(out)
assert result == {
"acceptance_rate": 0.5,
"accepted_graduations": 1,
"rejection_count": 1,
"skipped_rows": 1,
"top_rule_categories": [{"category": "tone", "count": 1}],
"total_corrections": 1,
}


def test_aggregate_treats_rows_as_single_outcome():
rows = [
{"event": "graduation.accepted", "outcome": "rejected", "accepted": True},
{"event": "graduation.rejected", "outcome": "accepted", "accepted": False},
{"accepted": True, "status": "rejected"},
{"accepted": False, "status": "accepted"},
]
data = snapshot.aggregate(rows)
assert data["accepted_graduations"] == 0
assert data["rejection_count"] == 0
Loading