diff --git a/Gradata/src/gradata/_migrations/__init__.py b/Gradata/src/gradata/_migrations/__init__.py index eddd13bb..148364da 100644 --- a/Gradata/src/gradata/_migrations/__init__.py +++ b/Gradata/src/gradata/_migrations/__init__.py @@ -72,6 +72,15 @@ timestamp TEXT NOT NULL, user_context TEXT )""", + """CREATE TABLE IF NOT EXISTS sync_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + payload_json TEXT NOT NULL, + kind TEXT NOT NULL CHECK(kind IN ('correction','lesson','event')), + enqueued_at REAL NOT NULL, + synced_at REAL, + attempts INTEGER DEFAULT 0, + last_error TEXT + )""", ] _MIGRATIONS: list[str] = [ @@ -97,6 +106,7 @@ "ALTER TABLE super_meta_rules ADD COLUMN applies_when TEXT", "ALTER TABLE super_meta_rules ADD COLUMN never_when TEXT", "ALTER TABLE super_meta_rules ADD COLUMN transfer_scope TEXT DEFAULT 'personal'", + "CREATE INDEX IF NOT EXISTS idx_sync_queue_pending ON sync_queue(synced_at) WHERE synced_at IS NULL", ] diff --git a/Gradata/src/gradata/_sync_queue.py b/Gradata/src/gradata/_sync_queue.py new file mode 100644 index 00000000..2d82a0fe --- /dev/null +++ b/Gradata/src/gradata/_sync_queue.py @@ -0,0 +1,125 @@ +"""Sync queue primitives — local SQLite buffer for write-through cloud sync. + +Day 1 of #194 (write-through sync). Provides CRUD over the ``sync_queue`` +table that is created as part of the standard Brain schema (see +``_migrations/__init__.py``). The queue stores serialized payloads that +need to be flushed to the cloud ingest endpoint; the actual flush +worker is implemented later in the rollout. + +All public functions take an explicit ``sqlite3.Connection`` as the first +argument — there are no implicit globals or module-level state. +""" + +from __future__ import annotations + +import json +import sqlite3 +import time + +__all__ = ["enqueue", "peek_pending", "mark_synced", "mark_failed"] + +_VALID_KINDS = ("correction", "lesson", "event") +_MAX_ERROR_LEN = 500 + + +def enqueue(conn: sqlite3.Connection, kind: str, payload: dict) -> int: + """Insert a new row into ``sync_queue`` and return its row id. + + Args: + conn: Active SQLite connection to a brain database. + kind: One of ``'correction'``, ``'lesson'``, ``'event'``. The + CHECK constraint on the column will reject anything else + with :class:`sqlite3.IntegrityError`. + payload: Arbitrary JSON-serializable dict; stored via + ``json.dumps`` into ``payload_json``. + + Returns: + The autoincrement ``id`` of the newly inserted row. + """ + payload_json = json.dumps(payload, ensure_ascii=False) + cur = conn.execute( + "INSERT INTO sync_queue (payload_json, kind, enqueued_at) " + "VALUES (?, ?, ?)", + (payload_json, kind, time.time()), + ) + conn.commit() + rowid = cur.lastrowid + if rowid is None: + raise RuntimeError("sync_queue insert did not return a lastrowid") + return int(rowid) + + +def peek_pending(conn: sqlite3.Connection, limit: int = 100) -> list[dict]: + """Return up to ``limit`` pending rows ordered by id (FIFO). + + Pending == ``synced_at IS NULL``. Each returned dict has keys: + ``id`` (int), ``kind`` (str), ``payload`` (decoded dict), + ``enqueued_at`` (float), and ``attempts`` (int). + """ + rows = conn.execute( + "SELECT id, kind, payload_json, enqueued_at, attempts " + "FROM sync_queue " + "WHERE synced_at IS NULL " + "ORDER BY id ASC " + "LIMIT ?", + (int(limit),), + ).fetchall() + + out: list[dict] = [] + for r in rows: + # Support both sqlite3.Row (named) and tuple rows. + try: + rid = r["id"] + kind = r["kind"] + payload_json = r["payload_json"] + enqueued_at = r["enqueued_at"] + attempts = r["attempts"] + except (TypeError, IndexError): + rid, kind, payload_json, enqueued_at, attempts = r + try: + payload = json.loads(payload_json) if payload_json else {} + except json.JSONDecodeError: + payload = {"_raw": payload_json} + out.append( + { + "id": int(rid), + "kind": kind, + "payload": payload, + "enqueued_at": float(enqueued_at) if enqueued_at is not None else None, + "attempts": int(attempts) if attempts is not None else 0, + } + ) + return out + + +def mark_synced(conn: sqlite3.Connection, ids: list[int]) -> None: + """Mark the given row ids as successfully synced (sets ``synced_at``).""" + if not ids: + return + now = time.time() + placeholders = ",".join("?" for _ in ids) + conn.execute( + f"UPDATE sync_queue SET synced_at = ? WHERE id IN ({placeholders})", + (now, *[int(i) for i in ids]), + ) + conn.commit() + + +def mark_failed(conn: sqlite3.Connection, ids: list[int], error: str) -> None: + """Record a failed sync attempt for the given rows. + + Increments ``attempts`` and stores the (truncated) error message on + every targeted row. The rows remain pending (``synced_at`` stays + NULL) so they will be retried. + """ + if not ids: + return + truncated = (error or "")[:_MAX_ERROR_LEN] + placeholders = ",".join("?" for _ in ids) + conn.execute( + f"UPDATE sync_queue " + f"SET attempts = COALESCE(attempts, 0) + 1, last_error = ? " + f"WHERE id IN ({placeholders})", + (truncated, *[int(i) for i in ids]), + ) + conn.commit() diff --git a/Gradata/src/gradata/onboard.py b/Gradata/src/gradata/onboard.py index bd102085..0ec2abd8 100644 --- a/Gradata/src/gradata/onboard.py +++ b/Gradata/src/gradata/onboard.py @@ -103,6 +103,18 @@ mention_count INTEGER DEFAULT 1 ) """, + # Write-through sync queue (cloud upload buffer, #194) + """ + CREATE TABLE IF NOT EXISTS sync_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + payload_json TEXT NOT NULL, + kind TEXT NOT NULL CHECK(kind IN ('correction','lesson','event')), + enqueued_at REAL NOT NULL, + synced_at REAL, + attempts INTEGER DEFAULT 0, + last_error TEXT + ) + """, ] _INDEXES_SQL = [ @@ -113,6 +125,7 @@ "CREATE INDEX IF NOT EXISTS idx_lesson_apps_lesson ON lesson_applications(lesson_id)", "CREATE INDEX IF NOT EXISTS idx_lesson_transitions_cat ON lesson_transitions(category)", "CREATE INDEX IF NOT EXISTS idx_lesson_transitions_desc ON lesson_transitions(lesson_desc)", + "CREATE INDEX IF NOT EXISTS idx_sync_queue_pending ON sync_queue(synced_at) WHERE synced_at IS NULL", ] # ── Subdirectories ──────────────────────────────────────────────────── diff --git a/Gradata/tests/test_sync_queue.py b/Gradata/tests/test_sync_queue.py new file mode 100644 index 00000000..eb00bec8 --- /dev/null +++ b/Gradata/tests/test_sync_queue.py @@ -0,0 +1,129 @@ +"""Unit tests for sync_queue primitives (#194, day 1). + +Covers schema creation via the brain migration module plus CRUD over the +``sync_queue`` table through :mod:`gradata._sync_queue`. +""" + +from __future__ import annotations + +import sqlite3 +import time + +import pytest + +from gradata import _sync_queue +from gradata._migrations import _BASE_TABLES, _MIGRATIONS + + +def _make_conn() -> sqlite3.Connection: + """Return an in-memory SQLite connection with the sync_queue schema applied. + + Pulls the canonical DDL straight from ``gradata._migrations`` so we + are testing the same schema a real brain would get. + """ + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + for sql in _BASE_TABLES: + if "sync_queue" in sql: + conn.execute(sql) + for sql in _MIGRATIONS: + if "sync_queue" in sql: + conn.execute(sql) + conn.commit() + return conn + + +def test_sync_queue_enqueue_returns_int_and_peek_finds_row(): + conn = _make_conn() + payload = {"draft": "hello", "final": "Hello.", "meta": {"n": 1}} + + row_id = _sync_queue.enqueue(conn, "correction", payload) + + assert isinstance(row_id, int) + assert row_id > 0 + + pending = _sync_queue.peek_pending(conn) + assert len(pending) == 1 + row = pending[0] + assert row["id"] == row_id + assert row["kind"] == "correction" + # Payload roundtrips exactly through json.dumps/loads. + assert row["payload"] == payload + assert row["attempts"] == 0 + assert isinstance(row["enqueued_at"], float) + + +def test_sync_queue_peek_pending_respects_limit_and_fifo_order(): + conn = _make_conn() + ids = [_sync_queue.enqueue(conn, "event", {"i": i}) for i in range(5)] + + pending = _sync_queue.peek_pending(conn, limit=3) + assert [r["id"] for r in pending] == ids[:3] + + +def test_sync_queue_mark_synced_removes_from_peek_pending(): + conn = _make_conn() + id_a = _sync_queue.enqueue(conn, "correction", {"x": 1}) + id_b = _sync_queue.enqueue(conn, "lesson", {"x": 2}) + + _sync_queue.mark_synced(conn, [id_a]) + + pending_ids = [r["id"] for r in _sync_queue.peek_pending(conn)] + assert id_a not in pending_ids + assert id_b in pending_ids + + # synced_at should be a float timestamp on row A. + row = conn.execute( + "SELECT synced_at FROM sync_queue WHERE id = ?", (id_a,) + ).fetchone() + assert row["synced_at"] is not None + assert float(row["synced_at"]) <= time.time() + 1.0 + + +def test_sync_queue_mark_failed_increments_attempts_and_keeps_row_pending(): + conn = _make_conn() + row_id = _sync_queue.enqueue(conn, "event", {"k": "v"}) + + _sync_queue.mark_failed(conn, [row_id], "boom: network unreachable") + _sync_queue.mark_failed(conn, [row_id], "boom again") + + pending = _sync_queue.peek_pending(conn) + assert len(pending) == 1 + assert pending[0]["id"] == row_id + assert pending[0]["attempts"] == 2 + + row = conn.execute( + "SELECT last_error, synced_at FROM sync_queue WHERE id = ?", (row_id,) + ).fetchone() + assert row["last_error"] == "boom again" + assert row["synced_at"] is None + + +def test_sync_queue_mark_failed_truncates_long_error_to_500_chars(): + conn = _make_conn() + row_id = _sync_queue.enqueue(conn, "event", {}) + long_err = "x" * 5000 + + _sync_queue.mark_failed(conn, [row_id], long_err) + + row = conn.execute( + "SELECT last_error FROM sync_queue WHERE id = ?", (row_id,) + ).fetchone() + assert len(row["last_error"]) == 500 + assert row["last_error"] == "x" * 500 + + +def test_sync_queue_kind_constraint_rejects_invalid_value(): + conn = _make_conn() + with pytest.raises(sqlite3.IntegrityError): + _sync_queue.enqueue(conn, "foo", {"bad": True}) + + +def test_sync_queue_empty_id_lists_are_noops(): + conn = _make_conn() + # Should not raise, should not touch the DB. + _sync_queue.mark_synced(conn, []) + _sync_queue.mark_failed(conn, [], "nope") + + pending = _sync_queue.peek_pending(conn) + assert pending == []