Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Gradata/src/gradata/_migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@
timestamp TEXT NOT NULL,
user_context TEXT
)""",
"""CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload_json TEXT NOT NULL,
kind TEXT NOT NULL CHECK(kind IN ('correction','lesson','event')),
enqueued_at REAL NOT NULL,
synced_at REAL,
attempts INTEGER DEFAULT 0,
last_error TEXT
)""",
]

_MIGRATIONS: list[str] = [
Expand All @@ -97,6 +106,7 @@
"ALTER TABLE super_meta_rules ADD COLUMN applies_when TEXT",
"ALTER TABLE super_meta_rules ADD COLUMN never_when TEXT",
"ALTER TABLE super_meta_rules ADD COLUMN transfer_scope TEXT DEFAULT 'personal'",
"CREATE INDEX IF NOT EXISTS idx_sync_queue_pending ON sync_queue(synced_at) WHERE synced_at IS NULL",
]


Expand Down
125 changes: 125 additions & 0 deletions Gradata/src/gradata/_sync_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Sync queue primitives — local SQLite buffer for write-through cloud sync.

Day 1 of #194 (write-through sync). Provides CRUD over the ``sync_queue``
table that is created as part of the standard Brain schema (see
``_migrations/__init__.py``). The queue stores serialized payloads that
need to be flushed to the cloud ingest endpoint; the actual flush
worker is implemented later in the rollout.

All public functions take an explicit ``sqlite3.Connection`` as the first
argument — there are no implicit globals or module-level state.
"""

from __future__ import annotations

import json
import sqlite3
import time

__all__ = ["enqueue", "peek_pending", "mark_synced", "mark_failed"]

_VALID_KINDS = ("correction", "lesson", "event")
_MAX_ERROR_LEN = 500


def enqueue(conn: sqlite3.Connection, kind: str, payload: dict) -> int:
"""Insert a new row into ``sync_queue`` and return its row id.

Args:
conn: Active SQLite connection to a brain database.
kind: One of ``'correction'``, ``'lesson'``, ``'event'``. The
CHECK constraint on the column will reject anything else
with :class:`sqlite3.IntegrityError`.
payload: Arbitrary JSON-serializable dict; stored via
``json.dumps`` into ``payload_json``.

Returns:
The autoincrement ``id`` of the newly inserted row.
"""
payload_json = json.dumps(payload, ensure_ascii=False)
cur = conn.execute(
"INSERT INTO sync_queue (payload_json, kind, enqueued_at) "
"VALUES (?, ?, ?)",
(payload_json, kind, time.time()),
)
conn.commit()
rowid = cur.lastrowid
if rowid is None:
raise RuntimeError("sync_queue insert did not return a lastrowid")
return int(rowid)


def peek_pending(conn: sqlite3.Connection, limit: int = 100) -> list[dict]:
"""Return up to ``limit`` pending rows ordered by id (FIFO).

Pending == ``synced_at IS NULL``. Each returned dict has keys:
``id`` (int), ``kind`` (str), ``payload`` (decoded dict),
``enqueued_at`` (float), and ``attempts`` (int).
"""
rows = conn.execute(
"SELECT id, kind, payload_json, enqueued_at, attempts "
"FROM sync_queue "
"WHERE synced_at IS NULL "
"ORDER BY id ASC "
"LIMIT ?",
(int(limit),),
).fetchall()

out: list[dict] = []
for r in rows:
# Support both sqlite3.Row (named) and tuple rows.
try:
rid = r["id"]
kind = r["kind"]
payload_json = r["payload_json"]
enqueued_at = r["enqueued_at"]
attempts = r["attempts"]
except (TypeError, IndexError):
rid, kind, payload_json, enqueued_at, attempts = r
try:
payload = json.loads(payload_json) if payload_json else {}
except json.JSONDecodeError:
payload = {"_raw": payload_json}
out.append(
{
"id": int(rid),
"kind": kind,
"payload": payload,
"enqueued_at": float(enqueued_at) if enqueued_at is not None else None,
"attempts": int(attempts) if attempts is not None else 0,
}
)
return out


def mark_synced(conn: sqlite3.Connection, ids: list[int]) -> None:
"""Mark the given row ids as successfully synced (sets ``synced_at``)."""
if not ids:
return
now = time.time()
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"UPDATE sync_queue SET synced_at = ? WHERE id IN ({placeholders})",
(now, *[int(i) for i in ids]),
)
conn.commit()


def mark_failed(conn: sqlite3.Connection, ids: list[int], error: str) -> None:
"""Record a failed sync attempt for the given rows.

Increments ``attempts`` and stores the (truncated) error message on
every targeted row. The rows remain pending (``synced_at`` stays
NULL) so they will be retried.
"""
if not ids:
return
truncated = (error or "")[:_MAX_ERROR_LEN]
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"UPDATE sync_queue "
f"SET attempts = COALESCE(attempts, 0) + 1, last_error = ? "
f"WHERE id IN ({placeholders})",
(truncated, *[int(i) for i in ids]),
)
conn.commit()
13 changes: 13 additions & 0 deletions Gradata/src/gradata/onboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@
mention_count INTEGER DEFAULT 1
)
""",
# Write-through sync queue (cloud upload buffer, #194)
"""
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload_json TEXT NOT NULL,
kind TEXT NOT NULL CHECK(kind IN ('correction','lesson','event')),
enqueued_at REAL NOT NULL,
synced_at REAL,
attempts INTEGER DEFAULT 0,
last_error TEXT
)
""",
]

_INDEXES_SQL = [
Expand All @@ -113,6 +125,7 @@
"CREATE INDEX IF NOT EXISTS idx_lesson_apps_lesson ON lesson_applications(lesson_id)",
"CREATE INDEX IF NOT EXISTS idx_lesson_transitions_cat ON lesson_transitions(category)",
"CREATE INDEX IF NOT EXISTS idx_lesson_transitions_desc ON lesson_transitions(lesson_desc)",
"CREATE INDEX IF NOT EXISTS idx_sync_queue_pending ON sync_queue(synced_at) WHERE synced_at IS NULL",
]

# ── Subdirectories ────────────────────────────────────────────────────
Expand Down
129 changes: 129 additions & 0 deletions Gradata/tests/test_sync_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Unit tests for sync_queue primitives (#194, day 1).

Covers schema creation via the brain migration module plus CRUD over the
``sync_queue`` table through :mod:`gradata._sync_queue`.
"""

from __future__ import annotations

import sqlite3
import time

import pytest

from gradata import _sync_queue
from gradata._migrations import _BASE_TABLES, _MIGRATIONS


def _make_conn() -> sqlite3.Connection:
"""Return an in-memory SQLite connection with the sync_queue schema applied.

Pulls the canonical DDL straight from ``gradata._migrations`` so we
are testing the same schema a real brain would get.
"""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
for sql in _BASE_TABLES:
if "sync_queue" in sql:
conn.execute(sql)
for sql in _MIGRATIONS:
if "sync_queue" in sql:
conn.execute(sql)
conn.commit()
return conn


def test_sync_queue_enqueue_returns_int_and_peek_finds_row():
conn = _make_conn()
payload = {"draft": "hello", "final": "Hello.", "meta": {"n": 1}}

row_id = _sync_queue.enqueue(conn, "correction", payload)

assert isinstance(row_id, int)
assert row_id > 0

pending = _sync_queue.peek_pending(conn)
assert len(pending) == 1
row = pending[0]
assert row["id"] == row_id
assert row["kind"] == "correction"
# Payload roundtrips exactly through json.dumps/loads.
assert row["payload"] == payload
assert row["attempts"] == 0
assert isinstance(row["enqueued_at"], float)


def test_sync_queue_peek_pending_respects_limit_and_fifo_order():
conn = _make_conn()
ids = [_sync_queue.enqueue(conn, "event", {"i": i}) for i in range(5)]

pending = _sync_queue.peek_pending(conn, limit=3)
assert [r["id"] for r in pending] == ids[:3]


def test_sync_queue_mark_synced_removes_from_peek_pending():
conn = _make_conn()
id_a = _sync_queue.enqueue(conn, "correction", {"x": 1})
id_b = _sync_queue.enqueue(conn, "lesson", {"x": 2})

_sync_queue.mark_synced(conn, [id_a])

pending_ids = [r["id"] for r in _sync_queue.peek_pending(conn)]
assert id_a not in pending_ids
assert id_b in pending_ids

# synced_at should be a float timestamp on row A.
row = conn.execute(
"SELECT synced_at FROM sync_queue WHERE id = ?", (id_a,)
).fetchone()
assert row["synced_at"] is not None
assert float(row["synced_at"]) <= time.time() + 1.0


def test_sync_queue_mark_failed_increments_attempts_and_keeps_row_pending():
conn = _make_conn()
row_id = _sync_queue.enqueue(conn, "event", {"k": "v"})

_sync_queue.mark_failed(conn, [row_id], "boom: network unreachable")
_sync_queue.mark_failed(conn, [row_id], "boom again")

pending = _sync_queue.peek_pending(conn)
assert len(pending) == 1
assert pending[0]["id"] == row_id
assert pending[0]["attempts"] == 2

row = conn.execute(
"SELECT last_error, synced_at FROM sync_queue WHERE id = ?", (row_id,)
).fetchone()
assert row["last_error"] == "boom again"
assert row["synced_at"] is None


def test_sync_queue_mark_failed_truncates_long_error_to_500_chars():
conn = _make_conn()
row_id = _sync_queue.enqueue(conn, "event", {})
long_err = "x" * 5000

_sync_queue.mark_failed(conn, [row_id], long_err)

row = conn.execute(
"SELECT last_error FROM sync_queue WHERE id = ?", (row_id,)
).fetchone()
assert len(row["last_error"]) == 500
assert row["last_error"] == "x" * 500


def test_sync_queue_kind_constraint_rejects_invalid_value():
conn = _make_conn()
with pytest.raises(sqlite3.IntegrityError):
_sync_queue.enqueue(conn, "foo", {"bad": True})


def test_sync_queue_empty_id_lists_are_noops():
conn = _make_conn()
# Should not raise, should not touch the DB.
_sync_queue.mark_synced(conn, [])
_sync_queue.mark_failed(conn, [], "nope")

pending = _sync_queue.peek_pending(conn)
assert pending == []
Loading