Skip to content

Commit b61c374

Browse files
committed
Introduce table for partial sync completions
1 parent 7ad5a13 commit b61c374

File tree

6 files changed

+121
-19
lines changed

6 files changed

+121
-19
lines changed

crates/core/src/kv.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ fn powersync_last_synced_at_impl(
4646
let db = ctx.db_handle();
4747

4848
// language=SQLite
49-
let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?;
49+
let statement =
50+
db.prepare_v2("select last_synced_at from ps_sync_state where priority = -1")?;
5051

5152
if statement.step()? == ResultCode::ROW {
5253
let client_id = statement.column_text(0)?;
53-
return Ok(Some(client_id.to_string()));
54+
Ok(Some(client_id.to_string()))
5455
} else {
55-
return Ok(None);
56+
Ok(None)
5657
}
5758
}
5859

crates/core/src/migrations.rs

+23
Original file line numberDiff line numberDiff line change
@@ -310,5 +310,28 @@ json_array(
310310
.into_db_result(local_db)?;
311311
}
312312

313+
if current_version < 7 && target_version >= 7 {
314+
local_db
315+
.exec_safe(
316+
"\
317+
CREATE TABLE ps_sync_state (
318+
priority INTEGER NOT NULL,
319+
last_synced_at TEXT NOT NULL
320+
) STRICT;
321+
INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at)
322+
SELECT -1, value from ps_kv where key = 'last_synced_at';
323+
324+
INSERT INTO ps_migration(id, down_migrations)
325+
VALUES(7,
326+
json_array(
327+
json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1'),
328+
json_object('sql', 'DROP TABLE ps_sync_state'),
329+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
330+
));
331+
",
332+
)
333+
.into_db_result(local_db)?;
334+
}
335+
313336
Ok(())
314337
}

crates/core/src/sync_local.rs

+31-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use core::ffi::c_void;
2+
13
use alloc::collections::BTreeSet;
4+
use alloc::fmt::format;
25
use alloc::format;
36
use alloc::string::String;
47
use alloc::vec::Vec;
@@ -288,19 +291,35 @@ GROUP BY b.row_type, b.row_id",
288291
}
289292

290293
fn mark_completed(&self) -> Result<(), SQLiteError> {
291-
if self.partial.is_none() {
292-
// language=SQLite
293-
self.db
294-
.exec_safe("DELETE FROM ps_updated_rows")
295-
.into_db_result(self.db)?;
294+
let priority_code = match &self.partial {
295+
None => {
296+
// language=SQLite
297+
self.db
298+
.exec_safe("DELETE FROM ps_updated_rows")
299+
.into_db_result(self.db)?;
300+
-1
301+
}
302+
Some(partial) => partial.priority.into(),
303+
};
296304

297-
// language=SQLite
298-
self.db
299-
.exec_safe(
300-
"insert or replace into ps_kv(key, value) values('last_synced_at', datetime())",
301-
)
302-
.into_db_result(self.db)?;
303-
}
305+
// Higher-priority buckets are always part of lower-priority sync operations too, so we can
306+
// delete information about higher-priority syncs (represented as lower priority numbers).
307+
// A complete sync is represented as -1.
308+
// language=SQLite
309+
let stmt = self
310+
.db
311+
.prepare_v2("DELETE FROM ps_sync_state WHERE (priority < ?1) OR (?1 = -1);")
312+
.into_db_result(self.db)?;
313+
stmt.bind_int(1, priority_code)?;
314+
stmt.exec()?;
315+
316+
// language=SQLite
317+
let stmt = self
318+
.db
319+
.prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());")
320+
.into_db_result(self.db)?;
321+
stmt.bind_int(1, priority_code)?;
322+
stmt.exec()?;
304323

305324
Ok(())
306325
}

crates/core/src/view_admin.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ fn powersync_init_impl(
120120

121121
setup_internal_views(local_db)?;
122122

123-
powersync_migrate(ctx, 6)?;
123+
powersync_migrate(ctx, 7)?;
124124

125125
Ok(String::from(""))
126126
}

dart/test/sync_test.dart

+3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ void main() {
177177
expect(fetchRows(), [
178178
for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'},
179179
]);
180+
181+
expect(db.select('select 1 from ps_sync_state where priority = ?', [i]),
182+
isNotEmpty);
180183
}
181184
});
182185
});

dart/test/utils/migration_fixtures.dart

+59-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 6;
2+
const databaseVersion = 7;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -172,7 +172,51 @@ const expectedState = <int, String>{
172172
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
173173
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
174174
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
175-
'''
175+
''',
176+
7: r'''
177+
;CREATE TABLE ps_buckets(
178+
id INTEGER PRIMARY KEY,
179+
name TEXT NOT NULL,
180+
last_applied_op INTEGER NOT NULL DEFAULT 0,
181+
last_op INTEGER NOT NULL DEFAULT 0,
182+
target_op INTEGER NOT NULL DEFAULT 0,
183+
add_checksum INTEGER NOT NULL DEFAULT 0,
184+
op_checksum INTEGER NOT NULL DEFAULT 0,
185+
pending_delete INTEGER NOT NULL DEFAULT 0
186+
) STRICT
187+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
188+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
189+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
190+
;CREATE TABLE ps_oplog(
191+
bucket INTEGER NOT NULL,
192+
op_id INTEGER NOT NULL,
193+
row_type TEXT,
194+
row_id TEXT,
195+
key TEXT,
196+
data TEXT,
197+
hash INTEGER NOT NULL) STRICT
198+
;CREATE TABLE ps_sync_state (
199+
priority INTEGER NOT NULL,
200+
last_synced_at TEXT NOT NULL
201+
) STRICT
202+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
203+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
204+
;CREATE TABLE ps_updated_rows(
205+
row_type TEXT,
206+
row_id TEXT,
207+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
208+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
209+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
210+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
211+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
212+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
213+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
214+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
215+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
216+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
217+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
218+
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
219+
''',
176220
};
177221

178222
final finalState = expectedState[databaseVersion]!;
@@ -230,6 +274,17 @@ const data1 = <int, String>{
230274
(2, 3, 'lists', 'l1', '', '{}', 3)
231275
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
232276
('lists', 'l2')
277+
''',
278+
7: r'''
279+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
280+
(1, 'b1', 0, 0, 0, 0, 120, 0),
281+
(2, 'b2', 0, 0, 0, 1005, 3, 0)
282+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
283+
(1, 1, 'todos', 't1', '', '{}', 100),
284+
(1, 2, 'todos', 't2', '', '{}', 20),
285+
(2, 3, 'lists', 'l1', '', '{}', 3)
286+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
287+
('lists', 'l2')
233288
'''
234289
};
235290

@@ -270,7 +325,8 @@ final dataDown1 = <int, String>{
270325
('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0),
271326
('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0)
272327
''',
273-
5: data1[5]!
328+
5: data1[5]!,
329+
6: data1[5]!
274330
};
275331

276332
final finalData1 = data1[databaseVersion]!;

0 commit comments

Comments
 (0)