Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Gradata/src/gradata/_migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@
timestamp TEXT NOT NULL,
user_context TEXT
)""",
# Write-through sync queue (cloud upload buffer, #194). Idempotent:
# if PR #195 lands first this block is a no-op via CREATE IF NOT EXISTS.
"""CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload_json TEXT NOT NULL,
kind TEXT NOT NULL CHECK(kind IN ('correction','lesson','event')),
enqueued_at REAL NOT NULL,
synced_at REAL,
attempts INTEGER DEFAULT 0,
last_error TEXT
)""",
]

_MIGRATIONS: list[str] = [
Expand All @@ -97,6 +108,7 @@
"ALTER TABLE super_meta_rules ADD COLUMN applies_when TEXT",
"ALTER TABLE super_meta_rules ADD COLUMN never_when TEXT",
"ALTER TABLE super_meta_rules ADD COLUMN transfer_scope TEXT DEFAULT 'personal'",
"CREATE INDEX IF NOT EXISTS idx_sync_queue_pending ON sync_queue(synced_at) WHERE synced_at IS NULL",
]


Expand Down
155 changes: 155 additions & 0 deletions Gradata/src/gradata/_sync_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Sync queue primitives — local SQLite buffer for write-through cloud sync.

Day 1 of #194 (write-through sync). Provides CRUD over the ``sync_queue``
table that is created as part of the standard Brain schema (see
``_migrations/__init__.py``). The queue stores serialized payloads that
need to be flushed to the cloud ingest endpoint; the actual flush
worker is implemented in :mod:`gradata._sync_worker`.

All public functions take an explicit ``sqlite3.Connection`` as the first
argument — there are no implicit globals or module-level state.
"""

from __future__ import annotations

import json
import sqlite3
import time

__all__ = [
"enqueue",
"enqueue_correction",
"mark_failed",
"mark_synced",
"peek_pending",
]

_VALID_KINDS = ("correction", "lesson", "event")
_MAX_ERROR_LEN = 500


def enqueue(conn: sqlite3.Connection, kind: str, payload: dict) -> int:
"""Insert a new row into ``sync_queue`` and return its row id.

Args:
conn: Active SQLite connection to a brain database.
kind: One of ``'correction'``, ``'lesson'``, ``'event'``. The
CHECK constraint on the column will reject anything else
with :class:`sqlite3.IntegrityError`.
payload: Arbitrary JSON-serializable dict; stored via
``json.dumps`` into ``payload_json``.

Returns:
The autoincrement ``id`` of the newly inserted row.
"""
payload_json = json.dumps(payload, ensure_ascii=False)
cur = conn.execute(
"INSERT INTO sync_queue (payload_json, kind, enqueued_at) VALUES (?, ?, ?)",
(payload_json, kind, time.time()),
)
conn.commit()
rowid = cur.lastrowid
if rowid is None:
raise RuntimeError("sync_queue insert did not return a lastrowid")
return int(rowid)


def enqueue_correction(
conn: sqlite3.Connection,
brain_id: str,
correction: dict,
event_id: str | None = None,
) -> int:
"""Convenience: enqueue a correction in the shape the cloud /ingest expects.

Wraps :func:`enqueue` with ``kind='correction'`` and the
``IngestRequest`` payload shape::

{"brain_id": <brain_id>, "correction": <correction>, "event_id": <event_id>}

The cloud receiver (``POST /api/v1/ingest``) deduplicates on
``event_id`` — pass a deterministic id (e.g. derived from
session/timestamp/content hash) to get idempotent retries.
"""
payload: dict = {
"brain_id": brain_id,
"correction": correction,
"event_id": event_id,
}
return enqueue(conn, "correction", payload)


def peek_pending(conn: sqlite3.Connection, limit: int = 100) -> list[dict]:
"""Return up to ``limit`` pending rows ordered by id (FIFO).

Pending == ``synced_at IS NULL``. Each returned dict has keys:
``id`` (int), ``kind`` (str), ``payload`` (decoded dict),
``enqueued_at`` (float), and ``attempts`` (int).
"""
rows = conn.execute(
"SELECT id, kind, payload_json, enqueued_at, attempts "
"FROM sync_queue "
"WHERE synced_at IS NULL "
"ORDER BY id ASC "
"LIMIT ?",
(int(limit),),
).fetchall()

out: list[dict] = []
for r in rows:
# Support both sqlite3.Row (named) and tuple rows.
try:
rid = r["id"]
kind = r["kind"]
payload_json = r["payload_json"]
enqueued_at = r["enqueued_at"]
attempts = r["attempts"]
except (TypeError, IndexError):
rid, kind, payload_json, enqueued_at, attempts = r
try:
payload = json.loads(payload_json) if payload_json else {}
except json.JSONDecodeError:
payload = {"_raw": payload_json}
out.append(
{
"id": int(rid),
"kind": kind,
"payload": payload,
"enqueued_at": float(enqueued_at) if enqueued_at is not None else None,
"attempts": int(attempts) if attempts is not None else 0,
}
)
return out


def mark_synced(conn: sqlite3.Connection, ids: list[int]) -> None:
"""Mark the given row ids as successfully synced (sets ``synced_at``)."""
if not ids:
return
now = time.time()
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"UPDATE sync_queue SET synced_at = ? WHERE id IN ({placeholders})",
(now, *[int(i) for i in ids]),
)
conn.commit()


def mark_failed(conn: sqlite3.Connection, ids: list[int], error: str) -> None:
"""Record a failed sync attempt for the given rows.

Increments ``attempts`` and stores the (truncated) error message on
every targeted row. The rows remain pending (``synced_at`` stays
NULL) so they will be retried.
"""
if not ids:
return
truncated = (error or "")[:_MAX_ERROR_LEN]
placeholders = ",".join("?" for _ in ids)
conn.execute(
f"UPDATE sync_queue "
f"SET attempts = COALESCE(attempts, 0) + 1, last_error = ? "
f"WHERE id IN ({placeholders})",
(truncated, *[int(i) for i in ids]),
)
conn.commit()
Loading
Loading