Skip to content

Commit 188a50b

Browse files
committed
Track opcounts
1 parent 0386fb5 commit 188a50b

File tree

7 files changed

+156
-14
lines changed

7 files changed

+156
-14
lines changed

crates/core/src/migrations.rs

+16
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::bucket_priority::BucketPriority;
1212
use crate::error::{PSResult, SQLiteError};
1313
use crate::fix035::apply_v035_fix;
1414

15+
pub const LATEST_VERSION: i32 = 9;
16+
1517
pub fn powersync_migrate(
1618
ctx: *mut sqlite::context,
1719
target_version: i32,
@@ -354,5 +356,19 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 8')
354356
local_db.exec_safe(&stmt).into_db_result(local_db)?;
355357
}
356358

359+
if current_version < 9 && target_version >= 9 {
360+
let stmt = "\
361+
ALTER TABLE ps_buckets ADD COLUMN count_at_last INTEGER NOT NULL DEFAULT 0;
362+
ALTER TABLE ps_buckets ADD COLUMN count_since_last INTEGER NOT NULL DEFAULT 0;
363+
INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
364+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_at_last'),
365+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN count_since_last'),
366+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 9')
367+
));
368+
";
369+
370+
local_db.exec_safe(stmt).into_db_result(local_db)?;
371+
}
372+
357373
Ok(())
358374
}

crates/core/src/operations.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
103103
let mut last_op: Option<i64> = None;
104104
let mut add_checksum: i32 = 0;
105105
let mut op_checksum: i32 = 0;
106+
let mut added_ops: i32 = 0;
106107

107108
while iterate_statement.step()? == ResultCode::ROW {
108109
let op_id = iterate_statement.column_int64(0)?;
@@ -113,6 +114,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
113114
let op_data = iterate_statement.column_text(5);
114115

115116
last_op = Some(op_id);
117+
added_ops += 1;
116118

117119
if op == "PUT" || op == "REMOVE" {
118120
let key: String;
@@ -236,13 +238,15 @@ WHERE bucket = ?1",
236238
"UPDATE ps_buckets
237239
SET last_op = ?2,
238240
add_checksum = (add_checksum + ?3) & 0xffffffff,
239-
op_checksum = (op_checksum + ?4) & 0xffffffff
241+
op_checksum = (op_checksum + ?4) & 0xffffffff,
242+
count_since_last = count_since_last + ?5
240243
WHERE id = ?1",
241244
)?;
242245
statement.bind_int64(1, bucket_id)?;
243246
statement.bind_int64(2, *last_op)?;
244247
statement.bind_int(3, add_checksum)?;
245248
statement.bind_int(4, op_checksum)?;
249+
statement.bind_int(5, added_ops)?;
246250

247251
statement.exec()?;
248252
}

crates/core/src/sync_local.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,10 @@ GROUP BY b.row_type, b.row_id",
264264
.db
265265
.prepare_v2( "\
266266
UPDATE ps_buckets
267-
SET last_applied_op = last_op
268-
WHERE last_applied_op != last_op AND
267+
SET last_applied_op = last_op,
268+
count_since_last = 0,
269+
count_at_last = count_at_last + count_since_last
270+
WHERE ((last_applied_op != last_op) OR count_since_last) AND
269271
name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))",
270272
)
271273
.into_db_result(self.db)?;
@@ -277,8 +279,10 @@ GROUP BY b.row_type, b.row_id",
277279
self.db
278280
.exec_safe(
279281
"UPDATE ps_buckets
280-
SET last_applied_op = last_op
281-
WHERE last_applied_op != last_op",
282+
SET last_applied_op = last_op,
283+
count_since_last = 0,
284+
count_at_last = count_at_last + count_since_last
285+
WHERE (last_applied_op != last_op) OR count_since_last",
282286
)
283287
.into_db_result(self.db)?;
284288
}

crates/core/src/view_admin.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use sqlite_nostd as sqlite;
1111
use sqlite_nostd::{Connection, Context};
1212

1313
use crate::error::SQLiteError;
14-
use crate::migrations::powersync_migrate;
14+
use crate::migrations::{powersync_migrate, LATEST_VERSION};
1515
use crate::util::quote_identifier;
1616
use crate::{create_auto_tx_function, create_sqlite_text_fn};
1717

@@ -120,7 +120,7 @@ fn powersync_init_impl(
120120

121121
setup_internal_views(local_db)?;
122122

123-
powersync_migrate(ctx, 8)?;
123+
powersync_migrate(ctx, LATEST_VERSION)?;
124124

125125
Ok(String::from(""))
126126
}

dart/test/utils/fix_035_fixtures.dart

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ const dataBroken = '''
1818

1919
/// Data after applying the migration fix, but before sync_local
2020
const dataMigrated = '''
21-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
22-
(1, 'b1', 0, 0, 0, 0, 120, 0),
23-
(2, 'b2', 0, 0, 0, 0, 3, 0)
21+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
22+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
23+
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
2424
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
2525
(1, 1, 'todos', 't1', '', '{}', 100),
2626
(1, 2, 'todos', 't2', '', '{}', 20),
@@ -39,9 +39,9 @@ const dataMigrated = '''
3939

4040
/// Data after applying the migration fix and sync_local
4141
const dataFixed = '''
42-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
43-
(1, 'b1', 0, 0, 0, 0, 120, 0),
44-
(2, 'b2', 0, 0, 0, 0, 3, 0)
42+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
43+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
44+
(2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0)
4545
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
4646
(1, 1, 'todos', 't1', '', '{}', 100),
4747
(1, 2, 'todos', 't2', '', '{}', 20),

dart/test/utils/migration_fixtures.dart

+59-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 8;
2+
const databaseVersion = 9;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -261,6 +261,52 @@ const expectedState = <int, String>{
261261
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
262262
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
263263
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
264+
''',
265+
9: r'''
266+
;CREATE TABLE ps_buckets(
267+
id INTEGER PRIMARY KEY,
268+
name TEXT NOT NULL,
269+
last_applied_op INTEGER NOT NULL DEFAULT 0,
270+
last_op INTEGER NOT NULL DEFAULT 0,
271+
target_op INTEGER NOT NULL DEFAULT 0,
272+
add_checksum INTEGER NOT NULL DEFAULT 0,
273+
op_checksum INTEGER NOT NULL DEFAULT 0,
274+
pending_delete INTEGER NOT NULL DEFAULT 0
275+
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
276+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
277+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
278+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
279+
;CREATE TABLE ps_oplog(
280+
bucket INTEGER NOT NULL,
281+
op_id INTEGER NOT NULL,
282+
row_type TEXT,
283+
row_id TEXT,
284+
key TEXT,
285+
data TEXT,
286+
hash INTEGER NOT NULL) STRICT
287+
;CREATE TABLE ps_sync_state (
288+
priority INTEGER NOT NULL PRIMARY KEY,
289+
last_synced_at TEXT NOT NULL
290+
) STRICT
291+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
292+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
293+
;CREATE TABLE ps_updated_rows(
294+
row_type TEXT,
295+
row_id TEXT,
296+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
297+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
298+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
299+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
300+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
301+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
302+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
303+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
304+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
305+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
306+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
307+
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
308+
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
309+
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
264310
''',
265311
};
266312

@@ -341,6 +387,17 @@ const data1 = <int, String>{
341387
(2, 3, 'lists', 'l1', '', '{}', 3)
342388
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
343389
('lists', 'l2')
390+
''',
391+
9: r'''
392+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
393+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
394+
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
395+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
396+
(1, 1, 'todos', 't1', '', '{}', 100),
397+
(1, 2, 'todos', 't2', '', '{}', 20),
398+
(2, 3, 'lists', 'l1', '', '{}', 3)
399+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
400+
('lists', 'l2')
344401
'''
345402
};
346403

@@ -384,6 +441,7 @@ final dataDown1 = <int, String>{
384441
5: data1[5]!,
385442
6: data1[5]!,
386443
7: data1[5]!,
444+
8: data1[5]!,
387445
};
388446

389447
final finalData1 = data1[databaseVersion]!;

docs/schema.md

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Internal PowerSync tables
2+
3+
This document is intended as a reference when working on the core PowerSync extension itself.
4+
For informtion relevant to PowerSync users, see [client-architecture](https://docs.powersync.com/architecture/client-architecture#schema).
5+
The document is also incomplete at the moment.
6+
7+
## `ps_migration`
8+
9+
__TODO__: Document
10+
11+
## `ps_buckets`
12+
13+
`ps_buckets` stores information about [buckets](https://docs.powersync.com/architecture/powersync-protocol#buckets) relevant to clients.
14+
A bucket is instantiated for every row returned by a parameter query in a [bucket definition](https://docs.powersync.com/usage/sync-rules/organize-data-into-buckets#organize-data-into-buckets).
15+
16+
Clients create entries in `ps_buckets` when receiving a checkpoint message from the sync service, they are also
17+
responsible for removing buckets that are no longer relevant to the client.
18+
There is also a special `$local` bucket representing pending
19+
uploads.
20+
21+
We store the following information in `ps_buckets`:
22+
23+
1. `id`: Internal (client-side only), alias to rowid for foreign references.
24+
2. `name`: The name of the bucket as received from the sync service.
25+
3. `last_applied_op`: The last operation id that has been verified and published to views (meaning that it was part of
26+
a checkpoint and that we have validated its checksum).
27+
4. `target_op`: Only used for `$local`. TODO: Document further.
28+
5. `add_checksum`: TODO: Document further.
29+
6. `op_checksum`: TODO: Document further.
30+
7. `pending_delete`: TODO: Appears to be unused, document further.
31+
8. `count_at_last`: The amount of operations in the bucket at the last verified (perhaps partial) checkpoint.
32+
9. `count_since_last`: The amount of operations downloaded since the last verified (perhaps partial) checkpoint.
33+
34+
## `ps_crud`
35+
36+
__TODO__: Document
37+
38+
## `ps_kv`
39+
40+
__TODO__: Document
41+
42+
## `ps_oplog`
43+
44+
__TODO__: Document
45+
46+
## `ps_sync_state`
47+
48+
__TODO__: Document
49+
50+
## `ps_tx`
51+
52+
__TODO__: Document
53+
54+
## `ps_untyped`
55+
56+
__TODO__: Document
57+
58+
## `ps_updated_rows`
59+
60+
__TODO__: Document

0 commit comments

Comments
 (0)