From c1c3540a723d22a8d9dfa81fb2c90c87e6c0ec4c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 29 Jan 2025 17:37:09 +0100 Subject: [PATCH 01/13] Add priority field to buckets --- crates/core/src/bucket_priority.rs | 46 ++++++++++++++++++++++++++++++ crates/core/src/lib.rs | 1 + crates/core/src/migrations.rs | 16 +++++++++++ crates/core/src/operations_vtab.rs | 11 ++++--- crates/core/src/sync_local.rs | 43 +++++++++++++++++++++------- crates/core/src/view_admin.rs | 2 +- 6 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 crates/core/src/bucket_priority.rs diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs new file mode 100644 index 0000000..daac330 --- /dev/null +++ b/crates/core/src/bucket_priority.rs @@ -0,0 +1,46 @@ +use sqlite_nostd::ResultCode; + +use crate::error::SQLiteError; + +#[repr(transparent)] +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct BucketPriority(i32); + +impl BucketPriority { + pub fn may_publish_with_outstanding_uploads(self) -> bool { + self.0 == 0 + } + + pub const HIGHEST: BucketPriority = BucketPriority(0); + pub const LOWEST: BucketPriority = BucketPriority(3); +} + +impl TryFrom for BucketPriority { + type Error = SQLiteError; + + fn try_from(value: i32) -> Result { + if value < BucketPriority::LOWEST.0 || value > BucketPriority::HIGHEST.0 { + return Err(SQLiteError::from(ResultCode::MISUSE)); + } + + return Ok(BucketPriority(value)); + } +} + +impl Default for BucketPriority { + fn default() -> Self { + Self(1) + } +} + +impl Into for BucketPriority { + fn into(self) -> i32 { + self.0 + } +} + +impl PartialOrd for BucketPriority { + fn partial_cmp(&self, other: &BucketPriority) -> Option { + Some(self.0.partial_cmp(&other.0)?.reverse()) + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 8c75b9e..972f21d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; +mod bucket_priority; mod checkpoint; mod crud_vtab; mod diff; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index bd999cd..fc010ef 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -310,5 +310,21 @@ json_array( .into_db_result(local_db)?; } + if current_version < 7 && target_version >= 7 { + local_db + .exec_safe( + "\ +ALTER TABLE ps_buckets ADD COLUMN priority NOT NULL DEFAULT 1; +INSERT INTO ps_migration(id, down_migrations) +VALUES(6, +json_array( + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN priority'), + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') +)); +", + ) + .into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index fbafaae..4ac441b 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -76,16 +76,15 @@ extern "C" fn update( } else if rowid.value_type() == sqlite::ColumnType::Null { // INSERT let op = args[2].text(); - let data = args[3].text(); let tab = unsafe { &mut *vtab.cast::() }; let db = tab.db; if op == "save" { - let result = insert_operation(db, data); + let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, data); + let result = sync_local(db, &args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; @@ -93,13 +92,13 @@ extern "C" fn update( } vtab_result(vtab, result) } else if op == "clear_remove_ops" { - let result = clear_remove_ops(db, data); + let result = clear_remove_ops(db, args[3].text()); vtab_result(vtab, result) } else if op == "delete_pending_buckets" { - let result = delete_pending_buckets(db, data); + let result = delete_pending_buckets(db, args[3].text()); vtab_result(vtab, result) } else if op == "delete_bucket" { - let result = delete_bucket(db, data); + let result = delete_bucket(db, args[3].text()); vtab_result(vtab, result) } else { ResultCode::MISUSE as c_int diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 0e6602e..c5e4b0c 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -2,21 +2,30 @@ use alloc::collections::BTreeSet; use alloc::format; use alloc::string::String; +use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; -use sqlite_nostd as sqlite; +use sqlite_nostd::{self as sqlite, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; -pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result { +fn can_apply_sync_changes( + db: *mut sqlite::sqlite3, + priority: BucketPriority, +) -> Result { + // We can only make sync changes visible if data is consistent, meaning that we've seen the + // target operation sent in the original checkpoint message. We allow weakening consistency when + // buckets from different priorities are involved (buckets with higher priorities or a lower + // priority number can be published before we've reached the checkpoint for other buckets). // language=SQLite let statement = db.prepare_v2( "\ SELECT group_concat(name) FROM ps_buckets -WHERE target_op > last_op", +WHERE (target_op > last_op) AND (priority <= ?)", )?; + statement.bind_int(1, priority.into())?; if statement.step()? != ResultCode::ROW { return Err(SQLiteError::from(ResultCode::ABORT)); @@ -26,22 +35,34 @@ WHERE target_op > last_op", return Ok(false); } - // This is specifically relevant for when data is added to crud before another batch is completed. - - // language=SQLite - let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; - if statement.step()? != ResultCode::DONE { - return Ok(false); + // Don't publish downloaded data until the upload queue is empty (except for downloaded data in + // priority 0, which is published earlier). + if !priority.may_publish_with_outstanding_uploads() { + let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; + if statement.step()? != ResultCode::DONE { + return Ok(false); + } } Ok(true) } -pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result { - if !can_update_local(db)? { +pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { + let priority = match data.value_type() { + ColumnType::Integer => BucketPriority::try_from(data.int()), + ColumnType::Float => BucketPriority::try_from(data.double() as i32), + // Older clients without bucket priority support typically send an empty string here. + _ => Ok(BucketPriority::LOWEST), + }?; + + if !can_apply_sync_changes(db, priority)? { return Ok(0); } + if priority >= BucketPriority::LOWEST { + todo!("Only consider changes from certain bucket priorities") + } + // language=SQLite let statement = db .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index b58ebaf..16f7a7f 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -120,7 +120,7 @@ fn powersync_init_impl( setup_internal_views(local_db)?; - powersync_migrate(ctx, 6)?; + powersync_migrate(ctx, 7)?; Ok(String::from("")) } From d34972bfd7ccabcf7cf0cf5d9815a1ca9500dee2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 29 Jan 2025 18:00:54 +0100 Subject: [PATCH 02/13] Create bucket with correct priority --- crates/core/src/operations.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index beb7b8b..742fdbe 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,6 +1,7 @@ use alloc::format; use alloc::string::String; +use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -17,8 +18,11 @@ SELECT json_extract(e.value, '$.data') as data, json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, - json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?, '$.buckets')) e", + json_extract(e.value, '$.next_after') as next_after, + json_extract(d.value, '$.priority') as priority, +FROM json_each(json_extract(?1, '$.buckets')) e + LEFT OUTER JOIN json_each(json_extract(?1, '$.descriptions')) d + ON json_extract(e.value, '$.bucket') == d.key", )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; @@ -28,8 +32,15 @@ FROM json_each(json_extract(?, '$.buckets')) e", // let _has_more = statement.column_int(2)? != 0; // let _after = statement.column_text(3)?; // let _next_after = statement.column_text(4)?; + let priority = match statement.column_type(5)? { + sqlite_nostd::ColumnType::Integer => { + BucketPriority::try_from(statement.column_int(5)?).ok() + } + _ => None, + } + .unwrap_or_default(); - insert_bucket_operations(db, bucket, data)?; + insert_bucket_operations(db, bucket, data, priority)?; } Ok(()) @@ -39,6 +50,7 @@ pub fn insert_bucket_operations( db: *mut sqlite::sqlite3, bucket: &str, data: &str, + priority: BucketPriority, ) -> Result<(), SQLiteError> { // Statement to insert new operations (only for PUT and REMOVE). // language=SQLite @@ -60,13 +72,14 @@ FROM json_each(?) e", // We can consider splitting this into separate SELECT and INSERT statements. // language=SQLite let bucket_statement = db.prepare_v2( - "INSERT INTO ps_buckets(name) - VALUES(?) + "INSERT INTO ps_buckets(name, priority) + VALUES(?, ?) ON CONFLICT DO UPDATE SET last_applied_op = last_applied_op RETURNING id, last_applied_op", )?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; + bucket_statement.bind_int(2, priority.into()); bucket_statement.step()?; let bucket_id = bucket_statement.column_int64(0)?; From ee6ea6b2490f53c900b1dfddc369e8cb655c1d46 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Jan 2025 11:45:49 +0100 Subject: [PATCH 03/13] Fix test --- Cargo.toml | 1 - crates/core/src/migrations.rs | 4 +- crates/core/src/operations.rs | 2 +- crates/core/src/sync_local.rs | 9 ++-- crates/sqlite/build.rs | 1 - dart/test/utils/fix_035_fixtures.dart | 12 +++--- dart/test/utils/migration_fixtures.dart | 56 ++++++++++++++++++++++++- dart/test/utils/native_test_utils.dart | 37 +++++++--------- 8 files changed, 82 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 790a462..024b6c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" -strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index fc010ef..157944a 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -314,9 +314,9 @@ json_array( local_db .exec_safe( "\ -ALTER TABLE ps_buckets ADD COLUMN priority NOT NULL DEFAULT 1; +ALTER TABLE ps_buckets ADD COLUMN priority INTEGER NOT NULL DEFAULT 1; INSERT INTO ps_migration(id, down_migrations) -VALUES(6, +VALUES(7, json_array( json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN priority'), json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 742fdbe..190aa2e 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -79,7 +79,7 @@ FROM json_each(?) e", RETURNING id, last_applied_op", )?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - bucket_statement.bind_int(2, priority.into()); + bucket_statement.bind_int(2, priority.into())?; bucket_statement.step()?; let bucket_id = bucket_statement.column_int64(0)?; diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index c5e4b0c..bc18a35 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -59,10 +59,6 @@ pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result= BucketPriority::LOWEST { - todo!("Only consider changes from certain bucket priorities") - } - // language=SQLite let statement = db .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") @@ -84,8 +80,8 @@ pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result buckets.last_applied_op) + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) + WHERE buckets.priority <= ? UNION SELECT row_type, row_id FROM ps_updated_rows ) @@ -108,6 +104,7 @@ GROUP BY b.row_type, b.row_id", // TODO: cache statements + statement.bind_int(1, priority.into())?; while statement.step().into_db_result(db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; diff --git a/crates/sqlite/build.rs b/crates/sqlite/build.rs index 01382c9..20b2953 100644 --- a/crates/sqlite/build.rs +++ b/crates/sqlite/build.rs @@ -1,4 +1,3 @@ - fn main() { let mut cfg = cc::Build::new(); diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index 22fc581..78cba7e 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -18,9 +18,9 @@ const dataBroken = ''' /// Data after applying the migration fix, but before sync_local const dataMigrated = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 1), + (2, 'b2', 0, 0, 0, 0, 3, 0, 1) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), @@ -39,9 +39,9 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 1), + (2, 'b2', 0, 0, 0, 0, 3, 0, 1) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 3797163..c9f387b 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 6; +const databaseVersion = 7; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -172,6 +172,46 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +''', + 7: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, priority INTEGER NOT NULL DEFAULT 1) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN priority"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') ''' }; @@ -230,6 +270,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 7: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 1), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 1) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -270,7 +321,8 @@ final dataDown1 = { ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) ''', - 5: data1[5]! + 5: data1[5]!, + 6: data1[5]!, }; final finalData1 = data1[databaseVersion]!; diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index 284142e..6481d4e 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -3,6 +3,7 @@ import 'dart:ffi'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/open.dart' as sqlite_open; import 'package:sqlite3/sqlite3.dart'; +import 'package:path/path.dart' as p; const defaultSqlitePath = 'libsqlite3.so.0'; @@ -13,7 +14,8 @@ CommonDatabase openTestDatabase() { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { - return DynamicLibrary.open('libsqlite3.dylib'); + return DynamicLibrary.open( + '/opt/homebrew/Cellar/sqlite/3.48.0/lib/libsqlite3.dylib'); }); var lib = DynamicLibrary.open(getLibraryForPlatform(path: libPath)); var extension = SqliteExtension.inLibrary(lib, 'sqlite3_powersync_init'); @@ -22,29 +24,22 @@ CommonDatabase openTestDatabase() { } String getLibraryForPlatform({String? path = "."}) { - switch (Abi.current()) { - case Abi.androidArm: - case Abi.androidArm64: - case Abi.androidX64: - return '$path/libpowersync.so'; - case Abi.macosArm64: - case Abi.macosX64: - return '$path/libpowersync.dylib'; - case Abi.linuxX64: - case Abi.linuxArm64: - return '$path/libpowersync.so'; - case Abi.windowsX64: - return '$path/powersync.dll'; - case Abi.androidIA32: - throw ArgumentError( + return p.normalize(p.absolute(switch (Abi.current()) { + Abi.androidArm || + Abi.androidArm64 || + Abi.androidX64 => + '$path/libpowersync.so', + Abi.macosArm64 || Abi.macosX64 => '$path/libpowersync.dylib', + Abi.linuxX64 || Abi.linuxArm64 => '$path/libpowersync.so', + Abi.windowsX64 => '$path/powersync.dll', + Abi.androidIA32 => throw ArgumentError( 'Unsupported processor architecture. X86 Android emulators are not ' 'supported. Please use an x86_64 emulator instead. All physical ' 'Android devices are supported including 32bit ARM.', - ); - default: - throw ArgumentError( + ), + _ => throw ArgumentError( 'Unsupported processor architecture "${Abi.current()}". ' 'Please open an issue on GitHub to request it.', - ); - } + ) + })); } From c695b1fbd80edf9119a7aa7ff627820b1ffb5d93 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Jan 2025 15:06:26 +0100 Subject: [PATCH 04/13] Add basic tests around priorities --- crates/core/src/bucket_priority.rs | 6 +- crates/core/src/operations.rs | 2 +- crates/core/src/sync_local.rs | 46 +++++--- dart/pubspec.lock | 179 ++++++++++++++++------------- dart/test/sync_test.dart | 178 ++++++++++++++++++++++++++++ 5 files changed, 315 insertions(+), 96 deletions(-) create mode 100644 dart/test/sync_test.dart diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index daac330..6a806bc 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -1,3 +1,5 @@ +use core::ops::RangeInclusive; + use sqlite_nostd::ResultCode; use crate::error::SQLiteError; @@ -19,7 +21,9 @@ impl TryFrom for BucketPriority { type Error = SQLiteError; fn try_from(value: i32) -> Result { - if value < BucketPriority::LOWEST.0 || value > BucketPriority::HIGHEST.0 { + const VALID: RangeInclusive = (BucketPriority::HIGHEST.0)..=(BucketPriority::LOWEST.0); + + if !VALID.contains(&value) { return Err(SQLiteError::from(ResultCode::MISUSE)); } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 190aa2e..faa2472 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -19,7 +19,7 @@ SELECT json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, json_extract(e.value, '$.next_after') as next_after, - json_extract(d.value, '$.priority') as priority, + json_extract(d.value, '$.priority') as priority FROM json_each(json_extract(?1, '$.buckets')) e LEFT OUTER JOIN json_each(json_extract(?1, '$.descriptions')) d ON json_extract(e.value, '$.bucket') == d.key", diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index bc18a35..f24c0ba 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -79,15 +79,16 @@ pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result buckets.last_applied_op) - WHERE buckets.priority <= ? - UNION SELECT row_type, row_id FROM ps_updated_rows + WHERE buckets.priority <= ?1 + UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows ) -- 3. Group the objects from different buckets together into a single one (ops). SELECT b.row_type as type, b.row_id as id, + b.local as local, r.data as data, count(r.bucket) as buckets, /* max() affects which row is used for 'data' */ @@ -97,6 +98,7 @@ FROM updated_rows b LEFT OUTER JOIN ps_oplog AS r ON r.row_type = b.row_type AND r.row_id = b.row_id + AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1 -- Group for (3) GROUP BY b.row_type, b.row_id", ) @@ -108,11 +110,18 @@ GROUP BY b.row_type, b.row_id", while statement.step().into_db_result(db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; - let buckets = statement.column_int(3)?; - let data = statement.column_text(2); + let local = statement.column_int(2)? == 1; + let buckets = statement.column_int(4)?; + let data = statement.column_text(3); let table_name = internal_table_name(type_name); + if local && buckets == 0 && priority == BucketPriority::HIGHEST { + // These rows are still local and they haven't been uploaded yet (which we allow for + // buckets with priority=0 completing). We should just keep them around. + continue; + } + if tables.contains(&table_name) { let quoted = quote_internal_name(type_name, false); @@ -157,20 +166,27 @@ GROUP BY b.row_type, b.row_id", } // language=SQLite - db.exec_safe( - "UPDATE ps_buckets + let updated = db + .prepare_v2( + "UPDATE ps_buckets SET last_applied_op = last_op - WHERE last_applied_op != last_op", - ) - .into_db_result(db)?; - - // language=SQLite - db.exec_safe("DELETE FROM ps_updated_rows") + WHERE last_applied_op != last_op AND priority <= ?", + ) .into_db_result(db)?; + updated.bind_int(1, priority.into())?; + updated.exec()?; - // language=SQLite - db.exec_safe("insert or replace into ps_kv(key, value) values('last_synced_at', datetime())") + if priority == BucketPriority::LOWEST { + // language=SQLite + db.exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(db)?; + + // language=SQLite + db.exec_safe( + "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", + ) .into_db_result(db)?; + } Ok(1) } diff --git a/dart/pubspec.lock b/dart/pubspec.lock index 53c0592..bdc6d4a 100644 --- a/dart/pubspec.lock +++ b/dart/pubspec.lock @@ -5,74 +5,79 @@ packages: dependency: transitive description: name: _fe_analyzer_shared - sha256: "0b2f2bd91ba804e53a61d757b986f89f1f9eaed5b11e4b2f5a2468d86d6c9fc7" + sha256: "03f6da266a27a4538a69295ec142cb5717d7d4e5727b84658b63e1e1509bac9c" url: "https://pub.dev" source: hosted - version: "67.0.0" + version: "79.0.0" + _macros: + dependency: transitive + description: dart + source: sdk + version: "0.3.3" analyzer: dependency: transitive description: name: analyzer - sha256: "37577842a27e4338429a1cbc32679d508836510b056f1eedf0c8d20e39c1383d" + sha256: c9040fc56483c22a5e04a9f6a251313118b1a3c42423770623128fa484115643 url: "https://pub.dev" source: hosted - version: "6.4.1" + version: "7.2.0" args: dependency: transitive description: name: args - sha256: "7cf60b9f0cc88203c5a190b4cd62a99feea42759a7fa695010eb5de1c0b2252a" + sha256: bf9f5caeea8d8fe6721a9c358dd8a5c1947b27f1cfaa18b39c301273594919e6 url: "https://pub.dev" source: hosted - version: "2.5.0" + version: "2.6.0" async: dependency: transitive description: name: async - sha256: "947bfcf187f74dbc5e146c9eb9c0f10c9f8b30743e341481c1e2ed3ecc18c20c" + sha256: d2872f9c19731c2e5f10444b14686eb7cc85c76274bd6c16e1816bff9a3bab63 url: "https://pub.dev" source: hosted - version: "2.11.0" + version: "2.12.0" boolean_selector: dependency: transitive description: name: boolean_selector - sha256: "6cfb5af12253eaf2b368f07bacc5a80d1301a071c73360d746b7f2e32d762c66" + sha256: "8aab1771e1243a5063b8b0ff68042d67334e3feab9e95b9490f9a6ebf73b42ea" url: "https://pub.dev" source: hosted - version: "2.1.1" + version: "2.1.2" collection: dependency: transitive description: name: collection - sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a + sha256: "2f5709ae4d3d59dd8f7cd309b4e023046b57d8a6c82130785d2b0e5868084e76" url: "https://pub.dev" source: hosted - version: "1.18.0" + version: "1.19.1" convert: dependency: transitive description: name: convert - sha256: "0f08b14755d163f6e2134cb58222dd25ea2a2ee8a195e53983d57c075324d592" + sha256: b30acd5944035672bc15c6b7a8b47d773e41e2f17de064350988c5d02adb1c68 url: "https://pub.dev" source: hosted - version: "3.1.1" + version: "3.1.2" coverage: dependency: transitive description: name: coverage - sha256: c1fb2dce3c0085f39dc72668e85f8e0210ec7de05345821ff58530567df345a5 + sha256: e3493833ea012784c740e341952298f1cc77f1f01b1bbc3eb4eecf6984fb7f43 url: "https://pub.dev" source: hosted - version: "1.9.2" + version: "1.11.1" crypto: dependency: transitive description: name: crypto - sha256: ec30d999af904f33454ba22ed9a86162b35e52b44ac4807d1d93c288041d7d27 + sha256: "1e445881f28f22d6140f181e07737b22f1e099a5e1ff94b0af2f9e4a463f4855" url: "https://pub.dev" source: hosted - version: "3.0.5" + version: "3.0.6" ffi: dependency: transitive description: @@ -85,10 +90,10 @@ packages: dependency: transitive description: name: file - sha256: "5fc22d7c25582e38ad9a8515372cd9a93834027aacf1801cf01164dac0ffa08c" + sha256: a3b4f84adafef897088c160faf7dfffb7696046cb13ae90b508c2cbc95d3b8d4 url: "https://pub.dev" source: hosted - version: "7.0.0" + version: "7.0.1" frontend_server_client: dependency: transitive description: @@ -101,34 +106,34 @@ packages: dependency: transitive description: name: glob - sha256: "0e7014b3b7d4dac1ca4d6114f82bf1782ee86745b9b42a92c9289c23d8a0ab63" + sha256: c3f1ee72c96f8f78935e18aa8cecced9ab132419e8625dc187e1c2408efc20de url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.3" http_multi_server: dependency: transitive description: name: http_multi_server - sha256: "97486f20f9c2f7be8f514851703d0119c3596d14ea63227af6f7a481ef2b2f8b" + sha256: aa6199f908078bb1c5efb8d8638d4ae191aac11b311132c3ef48ce352fb52ef8 url: "https://pub.dev" source: hosted - version: "3.2.1" + version: "3.2.2" http_parser: dependency: transitive description: name: http_parser - sha256: "2aa08ce0341cc9b354a498388e30986515406668dbcc4f7c950c3e715496693b" + sha256: "178d74305e7866013777bab2c3d8726205dc5a4dd935297175b19a23a2e66571" url: "https://pub.dev" source: hosted - version: "4.0.2" + version: "4.1.2" io: dependency: transitive description: name: io - sha256: "2ec25704aba361659e10e3e5f5d672068d332fc8ac516421d483a11e5cbd061e" + sha256: dfd5a80599cf0165756e3181807ed3e77daf6dd4137caaad72d0b7931597650b url: "https://pub.dev" source: hosted - version: "1.0.4" + version: "1.0.5" js: dependency: transitive description: @@ -141,34 +146,42 @@ packages: dependency: transitive description: name: logging - sha256: "623a88c9594aa774443aa3eb2d41807a48486b5613e67599fb4c41c0ad47c340" + sha256: c8245ada5f1717ed44271ed1c26b8ce85ca3228fd2ffdb75468ab01979309d61 url: "https://pub.dev" source: hosted - version: "1.2.0" + version: "1.3.0" + macros: + dependency: transitive + description: + name: macros + sha256: "1d9e801cd66f7ea3663c45fc708450db1fa57f988142c64289142c9b7ee80656" + url: "https://pub.dev" + source: hosted + version: "0.1.3-main.0" matcher: dependency: transitive description: name: matcher - sha256: d2323aa2060500f906aa31a895b4030b6da3ebdcc5619d14ce1aada65cd161cb + sha256: dc58c723c3c24bf8d3e2d3ad3f2f9d7bd9cf43ec6feaa64181775e60190153f2 url: "https://pub.dev" source: hosted - version: "0.12.16+1" + version: "0.12.17" meta: dependency: transitive description: name: meta - sha256: "7687075e408b093f36e6bbf6c91878cc0d4cd10f409506f7bc996f68220b9136" + sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c url: "https://pub.dev" source: hosted - version: "1.12.0" + version: "1.16.0" mime: dependency: transitive description: name: mime - sha256: "801fd0b26f14a4a58ccb09d5892c3fbdeff209594300a542492cf13fba9d247a" + sha256: "41a20518f0cb1256669420fdba0cd90d21561e560ac240f26ef8322e45bb7ed6" url: "https://pub.dev" source: hosted - version: "1.0.6" + version: "2.0.0" node_preamble: dependency: transitive description: @@ -181,18 +194,18 @@ packages: dependency: transitive description: name: package_config - sha256: "1c5b77ccc91e4823a5af61ee74e6b972db1ef98c2ff5a18d3161c982a55448bd" + sha256: "92d4488434b520a62570293fbd33bb556c7d49230791c1b4bbd973baf6d2dc67" url: "https://pub.dev" source: hosted - version: "2.1.0" + version: "2.1.1" path: dependency: transitive description: name: path - sha256: "087ce49c3f0dc39180befefc60fdb4acd8f8620e5682fe2476afd0b3688bb4af" + sha256: "75cca69d1490965be98c73ceaea117e8a04dd21217b37b292c9ddbec0d955bc5" url: "https://pub.dev" source: hosted - version: "1.9.0" + version: "1.9.1" pool: dependency: transitive description: @@ -205,18 +218,18 @@ packages: dependency: transitive description: name: pub_semver - sha256: "40d3ab1bbd474c4c2328c91e3a7df8c6dd629b79ece4c4bd04bee496a224fb0c" + sha256: "7b3cfbf654f3edd0c6298ecd5be782ce997ddf0e00531b9464b55245185bbbbd" url: "https://pub.dev" source: hosted - version: "2.1.4" + version: "2.1.5" shelf: dependency: transitive description: name: shelf - sha256: ad29c505aee705f41a4d8963641f91ac4cee3c8fad5947e033390a7bd8180fa4 + sha256: e7dd780a7ffb623c57850b33f43309312fc863fb6aa3d276a754bb299839ef12 url: "https://pub.dev" source: hosted - version: "1.4.1" + version: "1.4.2" shelf_packages_handler: dependency: transitive description: @@ -237,10 +250,10 @@ packages: dependency: transitive description: name: shelf_web_socket - sha256: "9ca081be41c60190ebcb4766b2486a7d50261db7bd0f5d9615f2d653637a84c1" + sha256: cc36c297b52866d203dbf9332263c94becc2fe0ceaa9681d07b6ef9807023b67 url: "https://pub.dev" source: hosted - version: "1.0.4" + version: "2.0.1" source_map_stack_trace: dependency: transitive description: @@ -253,122 +266,130 @@ packages: dependency: transitive description: name: source_maps - sha256: "708b3f6b97248e5781f493b765c3337db11c5d2c81c3094f10904bfa8004c703" + sha256: "190222579a448b03896e0ca6eca5998fa810fda630c1d65e2f78b3f638f54812" url: "https://pub.dev" source: hosted - version: "0.10.12" + version: "0.10.13" source_span: dependency: transitive description: name: source_span - sha256: "53e943d4206a5e30df338fd4c6e7a077e02254531b138a15aec3bd143c1a8b3c" + sha256: "254ee5351d6cb365c859e20ee823c3bb479bf4a293c22d17a9f1bf144ce86f7c" url: "https://pub.dev" source: hosted - version: "1.10.0" + version: "1.10.1" sqlite3: dependency: "direct main" description: name: sqlite3 - sha256: "45f168ae2213201b54e09429ed0c593dc2c88c924a1488d6f9c523a255d567cb" + sha256: "35d3726fe18ab1463403a5cc8d97dbc81f2a0b08082e8173851363fcc97b6627" url: "https://pub.dev" source: hosted - version: "2.4.6" + version: "2.7.2" stack_trace: dependency: transitive description: name: stack_trace - sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b" + sha256: "8b27215b45d22309b5cddda1aa2b19bdfec9df0e765f2de506401c071d38d1b1" url: "https://pub.dev" source: hosted - version: "1.11.1" + version: "1.12.1" stream_channel: dependency: transitive description: name: stream_channel - sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 + sha256: "969e04c80b8bcdf826f8f16579c7b14d780458bd97f56d107d3950fdbeef059d" url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.4" string_scanner: dependency: transitive description: name: string_scanner - sha256: "688af5ed3402a4bde5b3a6c15fd768dbf2621a614950b17f04626c431ab3c4c3" + sha256: "921cd31725b72fe181906c6a94d987c78e3b98c2e205b397ea399d4054872b43" url: "https://pub.dev" source: hosted - version: "1.3.0" + version: "1.4.1" term_glyph: dependency: transitive description: name: term_glyph - sha256: a29248a84fbb7c79282b40b8c72a1209db169a2e0542bce341da992fe1bc7e84 + sha256: "7f554798625ea768a7518313e58f83891c7f5024f88e46e7182a4558850a4b8e" url: "https://pub.dev" source: hosted - version: "1.2.1" + version: "1.2.2" test: dependency: "direct dev" description: name: test - sha256: "7ee446762c2c50b3bd4ea96fe13ffac69919352bd3b4b17bac3f3465edc58073" + sha256: "8391fbe68d520daf2314121764d38e37f934c02fd7301ad18307bd93bd6b725d" url: "https://pub.dev" source: hosted - version: "1.25.2" + version: "1.25.14" test_api: dependency: transitive description: name: test_api - sha256: "9955ae474176f7ac8ee4e989dadfb411a58c30415bcfb648fa04b2b8a03afa7f" + sha256: fb31f383e2ee25fbbfe06b40fe21e1e458d14080e3c67e7ba0acfde4df4e0bbd url: "https://pub.dev" source: hosted - version: "0.7.0" + version: "0.7.4" test_core: dependency: transitive description: name: test_core - sha256: "2bc4b4ecddd75309300d8096f781c0e3280ca1ef85beda558d33fcbedc2eead4" + sha256: "84d17c3486c8dfdbe5e12a50c8ae176d15e2a771b96909a9442b40173649ccaa" url: "https://pub.dev" source: hosted - version: "0.6.0" + version: "0.6.8" typed_data: dependency: transitive description: name: typed_data - sha256: facc8d6582f16042dd49f2463ff1bd6e2c9ef9f3d5da3d9b087e244a7b564b3c + sha256: f9049c039ebfeb4cf7a7104a675823cd72dba8297f264b6637062516699fa006 url: "https://pub.dev" source: hosted - version: "1.3.2" + version: "1.4.0" vm_service: dependency: transitive description: name: vm_service - sha256: "5c5f338a667b4c644744b661f309fb8080bb94b18a7e91ef1dbd343bed00ed6d" + sha256: ddfa8d30d89985b96407efce8acbdd124701f96741f2d981ca860662f1c0dc02 url: "https://pub.dev" source: hosted - version: "14.2.5" + version: "15.0.0" watcher: dependency: transitive description: name: watcher - sha256: "3d2ad6751b3c16cf07c7fca317a1413b3f26530319181b37e3b9039b84fc01d8" + sha256: "69da27e49efa56a15f8afe8f4438c4ec02eff0a117df1b22ea4aad194fe1c104" url: "https://pub.dev" source: hosted - version: "1.1.0" + version: "1.1.1" web: dependency: transitive description: name: web - sha256: d43c1d6b787bf0afad444700ae7f4db8827f701bc61c255ac8d328c6f4d52062 + sha256: cd3543bd5798f6ad290ea73d210f423502e71900302dde696f8bff84bf89a1cb url: "https://pub.dev" source: hosted - version: "1.0.0" + version: "1.1.0" + web_socket: + dependency: transitive + description: + name: web_socket + sha256: "3c12d96c0c9a4eec095246debcea7b86c0324f22df69893d538fcc6f1b8cce83" + url: "https://pub.dev" + source: hosted + version: "0.1.6" web_socket_channel: dependency: transitive description: name: web_socket_channel - sha256: d88238e5eac9a42bb43ca4e721edba3c08c6354d4a53063afaa568516217621b + sha256: "0b8e2457400d8a859b7b2030786835a28a8e80836ef64402abef392ff4f1d0e5" url: "https://pub.dev" source: hosted - version: "2.4.0" + version: "3.0.2" webkit_inspection_protocol: dependency: transitive description: @@ -381,9 +402,9 @@ packages: dependency: transitive description: name: yaml - sha256: "75769501ea3489fca56601ff33454fe45507ea3bfb014161abc3b43ae25989d5" + sha256: b9da305ac7c39faa3f030eccd175340f968459dae4af175130b3fc47e40d76ce url: "https://pub.dev" source: hosted - version: "3.1.2" + version: "3.1.3" sdks: - dart: ">=3.4.0 <4.0.0" + dart: ">=3.5.0 <4.0.0" diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart new file mode 100644 index 0000000..da8ae38 --- /dev/null +++ b/dart/test/sync_test.dart @@ -0,0 +1,178 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +void main() { + group('sync tests', () { + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase() + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); + }); + + tearDown(() { + db.dispose(); + }); + + void pushSyncData( + String bucket, + String opId, + String rowId, + Object op, + Object? data, { + Object? descriptions = _bucketDescriptions, + }) { + final encoded = json.encode({ + 'buckets': [ + { + 'bucket': bucket, + 'data': [ + { + 'op_id': opId, + 'op': op, + 'object_type': 'items', + 'object_id': rowId, + 'checksum': 0, + 'data': data, + } + ], + } + ], + if (descriptions != null) 'descriptions': descriptions, + }); + + db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', + ['save', encoded]); + } + + bool pushCheckpointComplete( + String lastOpId, String? writeCheckpoint, List checksums, + {int? priority}) { + final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [ + json.encode({ + 'last_op_id': lastOpId, + 'write_checkpoint': writeCheckpoint, + 'buckets': checksums, + }) + ]); + + final decoded = json.decode(row['r']); + if (decoded['valid'] != true) { + fail(decoded); + } + + db.execute( + 'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))', + [ + lastOpId, + json.encode(checksums.map((e) => (e as Map)['bucket']).toList()) + ], + ); + + db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', + ['sync_local', priority]); + return db.lastInsertRowId == 1; + } + + ResultSet fetchRows() { + return db.select('select * from items'); + } + + test('does not publish until reaching checkpoint', () { + expect(fetchRows(), isEmpty); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(fetchRows(), isEmpty); + + expect(pushCheckpointComplete('1', null, [_bucketChecksum('prio1', 0)]), + isTrue); + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('does not publish with pending local data', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(pushCheckpointComplete('1', null, [_bucketChecksum('prio1', 0)]), + isFalse); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'} + ]); + }); + + test('publishes with local data for prio=0 buckets', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect( + pushCheckpointComplete( + '1', + null, + [_bucketChecksum('prio0', 0)], + priority: 0, + ), + isTrue, + ); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'}, + {'id': 'row-0', 'col': 'hi'}, + ]); + }); + + test('can publish partial checkpoints under different priorities', () { + for (var i = 0; i < 4; i++) { + pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); + } + expect(fetchRows(), isEmpty); + + // Simulate a partial checkpoint complete for each of the buckets. + for (var i = 0; i < 4; i++) { + expect( + pushCheckpointComplete( + '1', + null, + [_bucketChecksum('prio$i', 0)], + priority: i, + ), + isTrue, + ); + + expect(fetchRows(), [ + for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, + ]); + } + }); + }); +} + +Object? _bucketChecksum(String bucket, int checksum) { + return {'bucket': bucket, 'checksum': checksum}; +} + +const _schema = { + 'tables': [ + { + 'name': 'items', + 'columns': [ + {'name': 'col', 'type': 'text'} + ], + } + ] +}; + +const _bucketDescriptions = { + 'prio0': {'priority': 0}, + 'prio1': {'priority': 1}, + 'prio2': {'priority': 2}, + 'prio3': {'priority': 3}, +}; From d3f4a2efbff78d639be6f3cd8cc225ffc3360001 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Jan 2025 15:17:37 +0100 Subject: [PATCH 05/13] Cleanup --- Cargo.toml | 1 + crates/core/src/bucket_priority.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 024b6c3..790a462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" +strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 6a806bc..90d5148 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -10,7 +10,7 @@ pub struct BucketPriority(i32); impl BucketPriority { pub fn may_publish_with_outstanding_uploads(self) -> bool { - self.0 == 0 + self == BucketPriority::HIGHEST } pub const HIGHEST: BucketPriority = BucketPriority(0); From 58dfa94ec3d439c752c0f915b7bffe950256804d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Jan 2025 15:21:52 +0100 Subject: [PATCH 06/13] Revert unintentional changes --- dart/test/utils/native_test_utils.dart | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index 6481d4e..bc8b637 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -14,8 +14,7 @@ CommonDatabase openTestDatabase() { return DynamicLibrary.open('libsqlite3.so.0'); }); sqlite_open.open.overrideFor(sqlite_open.OperatingSystem.macOS, () { - return DynamicLibrary.open( - '/opt/homebrew/Cellar/sqlite/3.48.0/lib/libsqlite3.dylib'); + return DynamicLibrary.open('libsqlite3.dylib'); }); var lib = DynamicLibrary.open(getLibraryForPlatform(path: libPath)); var extension = SqliteExtension.inLibrary(lib, 'sqlite3_powersync_init'); @@ -24,6 +23,8 @@ CommonDatabase openTestDatabase() { } String getLibraryForPlatform({String? path = "."}) { + // Using an absolute path is required for macOS, where Dart can't dlopen + // relative paths due to being a "hardened program". return p.normalize(p.absolute(switch (Abi.current()) { Abi.androidArm || Abi.androidArm64 || From f3b62abc80751e8f196628c869f5c3f764d7846f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 3 Feb 2025 14:24:57 +0100 Subject: [PATCH 07/13] Check checksums partially --- crates/core/src/bucket_priority.rs | 5 ++++- crates/core/src/checkpoint.rs | 2 ++ crates/core/src/sync_types.rs | 1 + dart/test/sync_test.dart | 26 ++++++++++++++++++++------ 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 90d5148..0700c4e 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -24,7 +24,10 @@ impl TryFrom for BucketPriority { const VALID: RangeInclusive = (BucketPriority::HIGHEST.0)..=(BucketPriority::LOWEST.0); if !VALID.contains(&value) { - return Err(SQLiteError::from(ResultCode::MISUSE)); + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Invalid bucket priority".into()), + )); } return Ok(BucketPriority(value)); diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 30cf11c..022520d 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -40,6 +40,8 @@ bucket_list(bucket, checksum) AS ( json_extract(json_each.value, '$.bucket') as bucket, json_extract(json_each.value, '$.checksum') as checksum FROM json_each(json_extract(?1, '$.buckets')) + WHERE IFNULL(json_extract(json_each.value, '$.priority'), 1) <= + IFNULL(json_extract(?1, '$.priority'), 3) ) SELECT bucket_list.bucket as bucket, diff --git a/crates/core/src/sync_types.rs b/crates/core/src/sync_types.rs index 429980d..060dd25 100644 --- a/crates/core/src/sync_types.rs +++ b/crates/core/src/sync_types.rs @@ -18,4 +18,5 @@ pub struct Checkpoint { pub struct BucketChecksum { pub bucket: String, pub checksum: i32, + pub priority: Option, } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index da8ae38..c9013b6 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -58,6 +58,7 @@ void main() { 'last_op_id': lastOpId, 'write_checkpoint': writeCheckpoint, 'buckets': checksums, + 'priority': priority, }) ]); @@ -88,7 +89,9 @@ void main() { pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); expect(fetchRows(), isEmpty); - expect(pushCheckpointComplete('1', null, [_bucketChecksum('prio1', 0)]), + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), isTrue); expect(fetchRows(), [ {'id': 'row-0', 'col': 'hi'} @@ -101,7 +104,9 @@ void main() { expect(fetchRows(), isNotEmpty); pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect(pushCheckpointComplete('1', null, [_bucketChecksum('prio1', 0)]), + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), isFalse); expect(fetchRows(), [ {'id': 'local', 'col': 'data'} @@ -118,7 +123,7 @@ void main() { pushCheckpointComplete( '1', null, - [_bucketChecksum('prio0', 0)], + [_bucketChecksum('prio0', 0, checksum: 0)], priority: 0, ), isTrue, @@ -141,7 +146,16 @@ void main() { pushCheckpointComplete( '1', null, - [_bucketChecksum('prio$i', 0)], + [ + for (var j = 0; j <= 4; j++) + _bucketChecksum( + 'prio$j', + j, + // Give buckets outside of the current priority a wrong + // checksum. They should not be validated yet. + checksum: j <= i ? 0 : 1234, + ), + ], priority: i, ), isTrue, @@ -155,8 +169,8 @@ void main() { }); } -Object? _bucketChecksum(String bucket, int checksum) { - return {'bucket': bucket, 'checksum': checksum}; +Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) { + return {'bucket': bucket, 'priority': prio, 'checksum': checksum}; } const _schema = { From 7a3df3ee76b62e1526090848da763e0c4dee8ae4 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 3 Feb 2025 16:02:48 +0100 Subject: [PATCH 08/13] Don't store priorities --- Cargo.toml | 1 - crates/core/src/bucket_priority.rs | 48 ++++++++++-- crates/core/src/migrations.rs | 16 ---- crates/core/src/operations.rs | 19 +---- crates/core/src/operations_vtab.rs | 2 +- crates/core/src/sync_local.rs | 99 ++++++++++++++++--------- dart/test/sync_test.dart | 12 ++- dart/test/utils/fix_035_fixtures.dart | 12 +-- dart/test/utils/migration_fixtures.dart | 56 +------------- 9 files changed, 127 insertions(+), 138 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 790a462..024b6c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" -strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 0700c4e..02ef496 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -1,5 +1,6 @@ use core::ops::RangeInclusive; +use serde::{de::Visitor, Deserialize}; use sqlite_nostd::ResultCode; use crate::error::SQLiteError; @@ -34,12 +35,6 @@ impl TryFrom for BucketPriority { } } -impl Default for BucketPriority { - fn default() -> Self { - Self(1) - } -} - impl Into for BucketPriority { fn into(self) -> i32 { self.0 @@ -51,3 +46,44 @@ impl PartialOrd for BucketPriority { Some(self.0.partial_cmp(&other.0)?.reverse()) } } + +impl<'de> Deserialize<'de> for BucketPriority { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct PriorityVisitor; + impl<'de> Visitor<'de> for PriorityVisitor { + type Value = BucketPriority; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + formatter.write_str("a priority as an integer between 0 and 3 (inclusive)") + } + + fn visit_i32(self, v: i32) -> Result + where + E: serde::de::Error, + { + BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default())) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?; + Self::visit_i32(self, i) + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?; + Self::visit_i32(self, i) + } + } + + deserializer.deserialize_i32(PriorityVisitor) + } +} diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 157944a..bd999cd 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -310,21 +310,5 @@ json_array( .into_db_result(local_db)?; } - if current_version < 7 && target_version >= 7 { - local_db - .exec_safe( - "\ -ALTER TABLE ps_buckets ADD COLUMN priority INTEGER NOT NULL DEFAULT 1; -INSERT INTO ps_migration(id, down_migrations) -VALUES(7, -json_array( - json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN priority'), - json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') -)); -", - ) - .into_db_result(local_db)?; - } - Ok(()) } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index faa2472..8b64a3e 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,7 +1,6 @@ use alloc::format; use alloc::string::String; -use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -18,8 +17,7 @@ SELECT json_extract(e.value, '$.data') as data, json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, - json_extract(e.value, '$.next_after') as next_after, - json_extract(d.value, '$.priority') as priority + json_extract(e.value, '$.next_after') as next_after FROM json_each(json_extract(?1, '$.buckets')) e LEFT OUTER JOIN json_each(json_extract(?1, '$.descriptions')) d ON json_extract(e.value, '$.bucket') == d.key", @@ -32,15 +30,8 @@ FROM json_each(json_extract(?1, '$.buckets')) e // let _has_more = statement.column_int(2)? != 0; // let _after = statement.column_text(3)?; // let _next_after = statement.column_text(4)?; - let priority = match statement.column_type(5)? { - sqlite_nostd::ColumnType::Integer => { - BucketPriority::try_from(statement.column_int(5)?).ok() - } - _ => None, - } - .unwrap_or_default(); - insert_bucket_operations(db, bucket, data, priority)?; + insert_bucket_operations(db, bucket, data)?; } Ok(()) @@ -50,7 +41,6 @@ pub fn insert_bucket_operations( db: *mut sqlite::sqlite3, bucket: &str, data: &str, - priority: BucketPriority, ) -> Result<(), SQLiteError> { // Statement to insert new operations (only for PUT and REMOVE). // language=SQLite @@ -72,14 +62,13 @@ FROM json_each(?) e", // We can consider splitting this into separate SELECT and INSERT statements. // language=SQLite let bucket_statement = db.prepare_v2( - "INSERT INTO ps_buckets(name, priority) - VALUES(?, ?) + "INSERT INTO ps_buckets(name) + VALUES(?) ON CONFLICT DO UPDATE SET last_applied_op = last_applied_op RETURNING id, last_applied_op", )?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - bucket_statement.bind_int(2, priority.into())?; bucket_statement.step()?; let bucket_id = bucket_statement.column_int64(0)?; diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 4ac441b..8746355 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -84,7 +84,7 @@ extern "C" fn update( let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, &args[3]); + let result = sync_local(db, args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index f24c0ba..249f538 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,6 +1,8 @@ use alloc::collections::BTreeSet; use alloc::format; use alloc::string::String; +use alloc::vec::Vec; +use serde::Deserialize; use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; @@ -14,30 +16,25 @@ fn can_apply_sync_changes( db: *mut sqlite::sqlite3, priority: BucketPriority, ) -> Result { - // We can only make sync changes visible if data is consistent, meaning that we've seen the - // target operation sent in the original checkpoint message. We allow weakening consistency when - // buckets from different priorities are involved (buckets with higher priorities or a lower - // priority number can be published before we've reached the checkpoint for other buckets). - // language=SQLite - let statement = db.prepare_v2( - "\ + // Don't publish downloaded data until the upload queue is empty (except for downloaded data in + // priority 0, which is published earlier). + if !priority.may_publish_with_outstanding_uploads() { + // language=SQLite + let statement = db.prepare_v2( + "\ SELECT group_concat(name) FROM ps_buckets -WHERE (target_op > last_op) AND (priority <= ?)", - )?; - statement.bind_int(1, priority.into())?; +WHERE target_op > last_op AND name = '$local'", + )?; - if statement.step()? != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); - } + if statement.step()? != ResultCode::ROW { + return Err(SQLiteError::from(ResultCode::ABORT)); + } - if statement.column_type(0)? == ColumnType::Text { - return Ok(false); - } + if statement.column_type(0)? == ColumnType::Text { + return Ok(false); + } - // Don't publish downloaded data until the upload queue is empty (except for downloaded data in - // priority 0, which is published earlier). - if !priority.may_publish_with_outstanding_uploads() { let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; if statement.step()? != ResultCode::DONE { return Ok(false); @@ -47,13 +44,27 @@ WHERE (target_op > last_op) AND (priority <= ?)", Ok(true) } -pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { - let priority = match data.value_type() { - ColumnType::Integer => BucketPriority::try_from(data.int()), - ColumnType::Float => BucketPriority::try_from(data.double() as i32), - // Older clients without bucket priority support typically send an empty string here. - _ => Ok(BucketPriority::LOWEST), - }?; +pub fn sync_local(db: *mut sqlite::sqlite3, data: *mut sqlite::value) -> Result { + #[derive(Deserialize)] + struct SyncLocalArguments { + #[serde(rename = "buckets")] + _buckets: Vec, + priority: Option, + } + + const FALLBACK_PRIORITY: BucketPriority = BucketPriority::LOWEST; + let (has_args, priority) = match data.value_type() { + ColumnType::Text => { + let text = data.text(); + if text.len() > 0 { + let args: SyncLocalArguments = serde_json::from_str(text)?; + (true, args.priority.unwrap_or(FALLBACK_PRIORITY)) + } else { + (false, FALLBACK_PRIORITY) + } + } + _ => (false, FALLBACK_PRIORITY), + }; if !can_apply_sync_changes(db, priority)? { return Ok(0); @@ -78,12 +89,17 @@ pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result buckets.last_applied_op) - WHERE buckets.priority <= ?1 - UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows -) +WITH + involved_buckets (id) AS ( + SELECT id FROM ps_buckets WHERE ?1 IS NULL + OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))) + ), + updated_rows AS ( + SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) + WHERE buckets.id IN (SELECT id FROM involved_buckets) + UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows + ) -- 3. Group the objects from different buckets together into a single one (ops). SELECT b.row_type as type, @@ -98,15 +114,19 @@ FROM updated_rows b LEFT OUTER JOIN ps_oplog AS r ON r.row_type = b.row_type AND r.row_id = b.row_id - AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1 + AND r.bucket IN (SELECT id FROM involved_buckets) -- Group for (3) GROUP BY b.row_type, b.row_id", ) .into_db_result(db)?; - // TODO: cache statements + if has_args { + statement.bind_value(1, data)?; + } else { + statement.bind_null(1)?; + } - statement.bind_int(1, priority.into())?; + // TODO: cache statements while statement.step().into_db_result(db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; @@ -170,10 +190,15 @@ GROUP BY b.row_type, b.row_id", .prepare_v2( "UPDATE ps_buckets SET last_applied_op = last_op - WHERE last_applied_op != last_op AND priority <= ?", + WHERE last_applied_op != last_op AND + (?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))))", ) .into_db_result(db)?; - updated.bind_int(1, priority.into())?; + if has_args { + updated.bind_value(1, data)?; + } else { + updated.bind_null(1)?; + } updated.exec()?; if priority == BucketPriority::LOWEST { diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index c9013b6..fe95733 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -75,8 +75,16 @@ void main() { ], ); - db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', - ['sync_local', priority]); + db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ + 'sync_local', + jsonEncode({ + 'priority': priority, + 'buckets': [ + for (final cs in checksums.cast>()) + if (priority == null || cs['priority'] <= priority) cs['bucket'] + ], + }) + ]); return db.lastInsertRowId == 1; } diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index 78cba7e..22fc581 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -18,9 +18,9 @@ const dataBroken = ''' /// Data after applying the migration fix, but before sync_local const dataMigrated = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 1), - (2, 'b2', 0, 0, 0, 0, 3, 0, 1) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0), + (2, 'b2', 0, 0, 0, 0, 3, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), @@ -39,9 +39,9 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 1), - (2, 'b2', 0, 0, 0, 0, 3, 0, 1) +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0), + (2, 'b2', 0, 0, 0, 0, 3, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20), diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index c9f387b..3797163 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 7; +const databaseVersion = 6; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -172,46 +172,6 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') -''', - 7: r''' -;CREATE TABLE ps_buckets( - id INTEGER PRIMARY KEY, - name TEXT NOT NULL, - last_applied_op INTEGER NOT NULL DEFAULT 0, - last_op INTEGER NOT NULL DEFAULT 0, - target_op INTEGER NOT NULL DEFAULT 0, - add_checksum INTEGER NOT NULL DEFAULT 0, - op_checksum INTEGER NOT NULL DEFAULT 0, - pending_delete INTEGER NOT NULL DEFAULT 0 -, priority INTEGER NOT NULL DEFAULT 1) STRICT -;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) -;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) -;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) -;CREATE TABLE ps_oplog( - bucket INTEGER NOT NULL, - op_id INTEGER NOT NULL, - row_type TEXT, - row_id TEXT, - key TEXT, - data TEXT, - hash INTEGER NOT NULL) STRICT -;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) -;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) -;CREATE TABLE ps_updated_rows( - row_type TEXT, - row_id TEXT, - PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID -;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) -;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) -;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) -;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) -;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) -;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN priority"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') ''' }; @@ -270,17 +230,6 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') -''', - 7: r''' -;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 1), - (2, 'b2', 0, 0, 0, 1005, 3, 0, 1) -;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES - (1, 1, 'todos', 't1', '', '{}', 100), - (1, 2, 'todos', 't2', '', '{}', 20), - (2, 3, 'lists', 'l1', '', '{}', 3) -;INSERT INTO ps_updated_rows(row_type, row_id) VALUES - ('lists', 'l2') ''' }; @@ -321,8 +270,7 @@ final dataDown1 = { ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) ''', - 5: data1[5]!, - 6: data1[5]!, + 5: data1[5]! }; final finalData1 = data1[databaseVersion]!; From e8397303f1413b426dc8fe2358bf4449b9184e39 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 3 Feb 2025 16:36:28 +0100 Subject: [PATCH 09/13] Revert unecessary join --- crates/core/src/operations.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 8b64a3e..2de3afe 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -18,9 +18,7 @@ SELECT json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?1, '$.buckets')) e - LEFT OUTER JOIN json_each(json_extract(?1, '$.descriptions')) d - ON json_extract(e.value, '$.bucket') == d.key", +FROM json_each(json_extract(?1, '$.buckets')) e", )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; From e1cc3d5c0da723eef9ea6e3c9067e4b7c43e3afd Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 4 Feb 2025 13:46:03 +0100 Subject: [PATCH 10/13] Refactor sync_local --- Cargo.toml | 1 + crates/core/src/bucket_priority.rs | 7 +- crates/core/src/checkpoint.rs | 2 - crates/core/src/operations_vtab.rs | 2 +- crates/core/src/sync_local.rs | 382 ++++++++++++++++++----------- crates/core/src/view_admin.rs | 2 +- dart/test/sync_test.dart | 23 +- 7 files changed, 254 insertions(+), 165 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 024b6c3..790a462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" +strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 02ef496..166a28c 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -1,5 +1,3 @@ -use core::ops::RangeInclusive; - use serde::{de::Visitor, Deserialize}; use sqlite_nostd::ResultCode; @@ -15,16 +13,13 @@ impl BucketPriority { } pub const HIGHEST: BucketPriority = BucketPriority(0); - pub const LOWEST: BucketPriority = BucketPriority(3); } impl TryFrom for BucketPriority { type Error = SQLiteError; fn try_from(value: i32) -> Result { - const VALID: RangeInclusive = (BucketPriority::HIGHEST.0)..=(BucketPriority::LOWEST.0); - - if !VALID.contains(&value) { + if value < BucketPriority::HIGHEST.0 { return Err(SQLiteError( ResultCode::MISUSE, Some("Invalid bucket priority".into()), diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 022520d..30cf11c 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -40,8 +40,6 @@ bucket_list(bucket, checksum) AS ( json_extract(json_each.value, '$.bucket') as bucket, json_extract(json_each.value, '$.checksum') as checksum FROM json_each(json_extract(?1, '$.buckets')) - WHERE IFNULL(json_extract(json_each.value, '$.priority'), 1) <= - IFNULL(json_extract(?1, '$.priority'), 3) ) SELECT bucket_list.bucket as bucket, diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 8746355..4ac441b 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -84,7 +84,7 @@ extern "C" fn update( let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, args[3]); + let result = sync_local(db, &args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 249f538..527e2bb 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -6,91 +6,223 @@ use serde::Deserialize; use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; -use sqlite_nostd::{self as sqlite, Value}; +use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; -fn can_apply_sync_changes( - db: *mut sqlite::sqlite3, +pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { + let mut operation = SyncOperation::new(db, data)?; + operation.apply() +} + +struct PartialSyncOperation<'a> { + /// The lowest priority part of the partial sync operation. priority: BucketPriority, -) -> Result { - // Don't publish downloaded data until the upload queue is empty (except for downloaded data in - // priority 0, which is published earlier). - if !priority.may_publish_with_outstanding_uploads() { - // language=SQLite - let statement = db.prepare_v2( - "\ -SELECT group_concat(name) -FROM ps_buckets -WHERE target_op > last_op AND name = '$local'", - )?; - - if statement.step()? != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); - } + /// The JSON-encoded arguments passed by the client SDK. This includes the priority and a list + /// of bucket names in that (and higher) priorities. + args: &'a str, +} - if statement.column_type(0)? == ColumnType::Text { - return Ok(false); - } +struct SyncOperation<'a> { + db: *mut sqlite::sqlite3, + data_tables: BTreeSet, + partial: Option>, +} - let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; - if statement.step()? != ResultCode::DONE { - return Ok(false); - } +impl<'a> SyncOperation<'a> { + fn new(db: *mut sqlite::sqlite3, data: &'a V) -> Result { + return Ok(Self { + db: db, + data_tables: BTreeSet::new(), + partial: match data.value_type() { + ColumnType::Text => { + let text = data.text(); + if text.len() > 0 { + #[derive(Deserialize)] + struct PartialSyncLocalArguments { + #[serde(rename = "buckets")] + _buckets: Vec, + priority: BucketPriority, + } + + let args: PartialSyncLocalArguments = serde_json::from_str(text)?; + Some(PartialSyncOperation { + priority: args.priority, + args: text, + }) + } else { + None + } + } + _ => None, + }, + }); } - Ok(true) -} + fn can_apply_sync_changes(&self) -> Result { + // Don't publish downloaded data until the upload queue is empty (except for downloaded data + //in priority 0, which is published earlier). + + let needs_check = match &self.partial { + Some(p) => !p.priority.may_publish_with_outstanding_uploads(), + None => true, + }; + + if needs_check { + // language=SQLite + let statement = self.db.prepare_v2( + "\ + SELECT group_concat(name) + FROM ps_buckets + WHERE target_op > last_op AND name = '$local'", + )?; + + if statement.step()? != ResultCode::ROW { + return Err(SQLiteError::from(ResultCode::ABORT)); + } + + if statement.column_type(0)? == ColumnType::Text { + return Ok(false); + } + + let statement = self.db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; + if statement.step()? != ResultCode::DONE { + return Ok(false); + } + } -pub fn sync_local(db: *mut sqlite::sqlite3, data: *mut sqlite::value) -> Result { - #[derive(Deserialize)] - struct SyncLocalArguments { - #[serde(rename = "buckets")] - _buckets: Vec, - priority: Option, + Ok(true) } - const FALLBACK_PRIORITY: BucketPriority = BucketPriority::LOWEST; - let (has_args, priority) = match data.value_type() { - ColumnType::Text => { - let text = data.text(); - if text.len() > 0 { - let args: SyncLocalArguments = serde_json::from_str(text)?; - (true, args.priority.unwrap_or(FALLBACK_PRIORITY)) + fn apply(&mut self) -> Result { + if !self.can_apply_sync_changes()? { + return Ok(0); + } + + self.collect_tables()?; + let statement = self.collect_full_operations()?; + // TODO: cache statements + while statement.step().into_db_result(self.db)? == ResultCode::ROW { + let type_name = statement.column_text(0)?; + let id = statement.column_text(1)?; + let buckets = statement.column_int(3)?; + let data = statement.column_text(2); + + let table_name = internal_table_name(type_name); + + if self.data_tables.contains(&table_name) { + let quoted = quote_internal_name(type_name, false); + + if buckets == 0 { + // DELETE + let delete_statement = self + .db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(self.db)?; + delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + let insert_statement = self + .db + .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) + .into_db_result(self.db)?; + insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } } else { - (false, FALLBACK_PRIORITY) + if buckets == 0 { + // DELETE + // language=SQLite + let delete_statement = self + .db + .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") + .into_db_result(self.db)?; + delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + // language=SQLite + let insert_statement = self + .db + .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") + .into_db_result(self.db)?; + insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } } } - _ => (false, FALLBACK_PRIORITY), - }; - if !can_apply_sync_changes(db, priority)? { - return Ok(0); + self.set_last_applied_op()?; + self.mark_completed()?; + + Ok(1) } - // language=SQLite - let statement = db - .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") - .into_db_result(db)?; - let mut tables: BTreeSet = BTreeSet::new(); + fn collect_tables(&mut self) -> Result<(), SQLiteError> { + // language=SQLite + let statement = self + .db + .prepare_v2( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'", + ) + .into_db_result(self.db)?; - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - tables.insert(String::from(name)); + while statement.step()? == ResultCode::ROW { + let name = statement.column_text(0)?; + self.data_tables.insert(String::from(name)); + } + Ok(()) } - // Query for updated objects + fn collect_full_operations(&self) -> Result { + Ok(match &self.partial { + None => { + // Complete sync + self.db + .prepare_v2( + "\ +-- 1. Filter oplog by the ops added but not applied yet (oplog b). +-- SELECT DISTINCT / UNION is important for cases with many duplicate ids. +WITH updated_rows AS ( + SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION SELECT row_type, row_id FROM ps_updated_rows +) - // language=SQLite - let statement = db - .prepare_v2( - "\ +-- 3. Group the objects from different buckets together into a single one (ops). +SELECT b.row_type as type, + b.row_id as id, + r.data as data, + count(r.bucket) as buckets, + /* max() affects which row is used for 'data' */ + max(r.op_id) as op_id +-- 2. Find *all* current ops over different buckets for those objects (oplog r). +FROM updated_rows b + LEFT OUTER JOIN ps_oplog AS r + ON r.row_type = b.row_type + AND r.row_id = b.row_id +-- Group for (3) +GROUP BY b.row_type, b.row_id", + ) + .into_db_result(self.db)? + } + Some(partial) => { + let stmt = self + .db + .prepare_v2( + "\ -- 1. Filter oplog by the ops added but not applied yet (oplog b). -- SELECT DISTINCT / UNION is important for cases with many duplicate ids. WITH - involved_buckets (id) AS ( + involved_buckets (id) AS MATERIALIZED ( SELECT id FROM ps_buckets WHERE ?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))) ), @@ -98,13 +230,11 @@ WITH SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) WHERE buckets.id IN (SELECT id FROM involved_buckets) - UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows ) -- 3. Group the objects from different buckets together into a single one (ops). SELECT b.row_type as type, b.row_id as id, - b.local as local, r.data as data, count(r.bucket) as buckets, /* max() affects which row is used for 'data' */ @@ -117,101 +247,61 @@ FROM updated_rows b AND r.bucket IN (SELECT id FROM involved_buckets) -- Group for (3) GROUP BY b.row_type, b.row_id", - ) - .into_db_result(db)?; + ) + .into_db_result(self.db)?; + stmt.bind_text(1, partial.args, Destructor::STATIC)?; - if has_args { - statement.bind_value(1, data)?; - } else { - statement.bind_null(1)?; + stmt + } + }) } - // TODO: cache statements - while statement.step().into_db_result(db)? == ResultCode::ROW { - let type_name = statement.column_text(0)?; - let id = statement.column_text(1)?; - let local = statement.column_int(2)? == 1; - let buckets = statement.column_int(4)?; - let data = statement.column_text(3); - - let table_name = internal_table_name(type_name); - - if local && buckets == 0 && priority == BucketPriority::HIGHEST { - // These rows are still local and they haven't been uploaded yet (which we allow for - // buckets with priority=0 completing). We should just keep them around. - continue; - } - - if tables.contains(&table_name) { - let quoted = quote_internal_name(type_name, false); - - if buckets == 0 { - // DELETE - let delete_statement = db - .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) - .into_db_result(db)?; - delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE - let insert_statement = db - .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) - .into_db_result(db)?; - insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; - } - } else { - if buckets == 0 { - // DELETE + fn set_last_applied_op(&self) -> Result<(), SQLiteError> { + match &self.partial { + Some(partial) => { // language=SQLite - let delete_statement = db - .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") - .into_db_result(db)?; - delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE + let updated = self + .db + .prepare_v2( "\ + UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op AND + name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))", + ) + .into_db_result(self.db)?; + updated.bind_text(1, partial.args, Destructor::STATIC)?; + updated.exec()?; + } + None => { // language=SQLite - let insert_statement = db - .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") - .into_db_result(db)?; - insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; + self.db + .exec_safe( + "UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op", + ) + .into_db_result(self.db)?; } } - } - // language=SQLite - let updated = db - .prepare_v2( - "UPDATE ps_buckets - SET last_applied_op = last_op - WHERE last_applied_op != last_op AND - (?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))))", - ) - .into_db_result(db)?; - if has_args { - updated.bind_value(1, data)?; - } else { - updated.bind_null(1)?; + Ok(()) } - updated.exec()?; - if priority == BucketPriority::LOWEST { - // language=SQLite - db.exec_safe("DELETE FROM ps_updated_rows") - .into_db_result(db)?; + fn mark_completed(&self) -> Result<(), SQLiteError> { + if self.partial.is_none() { + // language=SQLite + self.db + .exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(self.db)?; - // language=SQLite - db.exec_safe( - "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", - ) - .into_db_result(db)?; - } + // language=SQLite + self.db + .exec_safe( + "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", + ) + .into_db_result(self.db)?; + } - Ok(1) + Ok(()) + } } diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 16f7a7f..b58ebaf 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -120,7 +120,7 @@ fn powersync_init_impl( setup_internal_views(local_db)?; - powersync_migrate(ctx, 7)?; + powersync_migrate(ctx, 6)?; Ok(String::from("")) } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index fe95733..b088485 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -57,14 +57,17 @@ void main() { json.encode({ 'last_op_id': lastOpId, 'write_checkpoint': writeCheckpoint, - 'buckets': checksums, + 'buckets': [ + for (final cs in checksums.cast>()) + if (priority == null || cs['priority'] <= priority) cs + ], 'priority': priority, }) ]); final decoded = json.decode(row['r']); if (decoded['valid'] != true) { - fail(decoded); + fail(row['r']); } db.execute( @@ -77,13 +80,15 @@ void main() { db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ 'sync_local', - jsonEncode({ - 'priority': priority, - 'buckets': [ - for (final cs in checksums.cast>()) - if (priority == null || cs['priority'] <= priority) cs['bucket'] - ], - }) + priority != null + ? jsonEncode({ + 'priority': priority, + 'buckets': [ + for (final cs in checksums.cast>()) + if (cs['priority'] <= priority) cs['bucket'] + ], + }) + : null, ]); return db.lastInsertRowId == 1; } From 56b4d8e94ed601187822b0419a02c1ec6a9cce2d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 5 Feb 2025 16:46:51 +0100 Subject: [PATCH 11/13] Introduce table for partial sync completions --- crates/core/src/kv.rs | 7 +-- crates/core/src/migrations.rs | 23 +++++++++ crates/core/src/sync_local.rs | 43 ++++++++++++----- crates/core/src/view_admin.rs | 2 +- dart/test/sync_test.dart | 3 ++ dart/test/utils/migration_fixtures.dart | 62 +++++++++++++++++++++++-- 6 files changed, 121 insertions(+), 19 deletions(-) diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 70b5bd8..4a7fa44 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -46,13 +46,14 @@ fn powersync_last_synced_at_impl( let db = ctx.db_handle(); // language=SQLite - let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?; + let statement = + db.prepare_v2("select last_synced_at from ps_sync_state where priority = -1")?; if statement.step()? == ResultCode::ROW { let client_id = statement.column_text(0)?; - return Ok(Some(client_id.to_string())); + Ok(Some(client_id.to_string())) } else { - return Ok(None); + Ok(None) } } diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index bd999cd..57399a7 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -310,5 +310,28 @@ json_array( .into_db_result(local_db)?; } + if current_version < 7 && target_version >= 7 { + local_db + .exec_safe( + "\ +CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL, + last_synced_at TEXT NOT NULL +) STRICT; +INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at) + SELECT -1, value from ps_kv where key = 'last_synced_at'; + +INSERT INTO ps_migration(id, down_migrations) +VALUES(7, +json_array( +json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1'), +json_object('sql', 'DROP TABLE ps_sync_state'), +json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') +)); +", + ) + .into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 527e2bb..76a8d7a 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,4 +1,7 @@ +use core::ffi::c_void; + use alloc::collections::BTreeSet; +use alloc::fmt::format; use alloc::format; use alloc::string::String; use alloc::vec::Vec; @@ -288,19 +291,35 @@ GROUP BY b.row_type, b.row_id", } fn mark_completed(&self) -> Result<(), SQLiteError> { - if self.partial.is_none() { - // language=SQLite - self.db - .exec_safe("DELETE FROM ps_updated_rows") - .into_db_result(self.db)?; + let priority_code = match &self.partial { + None => { + // language=SQLite + self.db + .exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(self.db)?; + -1 + } + Some(partial) => partial.priority.into(), + }; - // language=SQLite - self.db - .exec_safe( - "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", - ) - .into_db_result(self.db)?; - } + // Higher-priority buckets are always part of lower-priority sync operations too, so we can + // delete information about higher-priority syncs (represented as lower priority numbers). + // A complete sync is represented as -1. + // language=SQLite + let stmt = self + .db + .prepare_v2("DELETE FROM ps_sync_state WHERE (priority < ?1) OR (?1 = -1);") + .into_db_result(self.db)?; + stmt.bind_int(1, priority_code)?; + stmt.exec()?; + + // language=SQLite + let stmt = self + .db + .prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());") + .into_db_result(self.db)?; + stmt.bind_int(1, priority_code)?; + stmt.exec()?; Ok(()) } diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index b58ebaf..16f7a7f 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -120,7 +120,7 @@ fn powersync_init_impl( setup_internal_views(local_db)?; - powersync_migrate(ctx, 6)?; + powersync_migrate(ctx, 7)?; Ok(String::from("")) } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index b088485..f4f367c 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -177,6 +177,9 @@ void main() { expect(fetchRows(), [ for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, ]); + + expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), + isNotEmpty); } }); }); diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 3797163..a60ae00 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 6; +const databaseVersion = 7; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -172,7 +172,51 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') -''' +''', + 7: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +''', }; final finalState = expectedState[databaseVersion]!; @@ -230,6 +274,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 7: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -270,7 +325,8 @@ final dataDown1 = { ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) ''', - 5: data1[5]! + 5: data1[5]!, + 6: data1[5]! }; final finalData1 = data1[databaseVersion]!; From fe90f9c7152587468174da92ba3be879040f93f1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 11 Feb 2025 13:44:07 +0100 Subject: [PATCH 12/13] Represent complete sync with low priority number --- Cargo.toml | 1 - crates/core/src/bucket_priority.rs | 17 +++++++++++------ crates/core/src/kv.rs | 5 +++-- crates/core/src/migrations.rs | 16 ++++++++-------- crates/core/src/sync_local.rs | 16 +++++++--------- dart/test/utils/migration_fixtures.dart | 2 +- 6 files changed, 30 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 790a462..024b6c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" -strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 166a28c..454f1fe 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -5,40 +5,45 @@ use crate::error::SQLiteError; #[repr(transparent)] #[derive(Clone, Copy, PartialEq, Eq)] -pub struct BucketPriority(i32); +pub struct BucketPriority { + pub number: i32, +} impl BucketPriority { pub fn may_publish_with_outstanding_uploads(self) -> bool { self == BucketPriority::HIGHEST } - pub const HIGHEST: BucketPriority = BucketPriority(0); + pub const HIGHEST: BucketPriority = BucketPriority { number: 0 }; + + /// A low priority used to represent fully-completed sync operations across all priorities. + pub const SENTINEL: BucketPriority = BucketPriority { number: i32::MAX }; } impl TryFrom for BucketPriority { type Error = SQLiteError; fn try_from(value: i32) -> Result { - if value < BucketPriority::HIGHEST.0 { + if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number { return Err(SQLiteError( ResultCode::MISUSE, Some("Invalid bucket priority".into()), )); } - return Ok(BucketPriority(value)); + return Ok(BucketPriority { number: value }); } } impl Into for BucketPriority { fn into(self) -> i32 { - self.0 + self.number } } impl PartialOrd for BucketPriority { fn partial_cmp(&self, other: &BucketPriority) -> Option { - Some(self.0.partial_cmp(&other.0)?.reverse()) + Some(self.number.partial_cmp(&other.number)?.reverse()) } } diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 4a7fa44..c5b7bbc 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -9,6 +9,7 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; +use crate::bucket_priority::BucketPriority; use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; @@ -46,8 +47,8 @@ fn powersync_last_synced_at_impl( let db = ctx.db_handle(); // language=SQLite - let statement = - db.prepare_v2("select last_synced_at from ps_sync_state where priority = -1")?; + let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?; + statement.bind_int(1, BucketPriority::SENTINEL.into())?; if statement.step()? == ResultCode::ROW { let client_id = statement.column_text(0)?; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 57399a7..0bfbb03 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -8,6 +8,7 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; +use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; use crate::fix035::apply_v035_fix; @@ -311,26 +312,25 @@ json_array( } if current_version < 7 && target_version >= 7 { - local_db - .exec_safe( - "\ + const SENTINEL_PRIORITY: i32 = BucketPriority::SENTINEL.number; + let stmt = format!("\ CREATE TABLE ps_sync_state ( priority INTEGER NOT NULL, last_synced_at TEXT NOT NULL ) STRICT; INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at) - SELECT -1, value from ps_kv where key = 'last_synced_at'; + SELECT {}, value from ps_kv where key = 'last_synced_at'; INSERT INTO ps_migration(id, down_migrations) VALUES(7, json_array( -json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1'), +json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = {}'), json_object('sql', 'DROP TABLE ps_sync_state'), json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') )); -", - ) - .into_db_result(local_db)?; +", SENTINEL_PRIORITY, SENTINEL_PRIORITY); + + local_db.exec_safe(&stmt).into_db_result(local_db)?; } Ok(()) diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 76a8d7a..89646f4 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,7 +1,4 @@ -use core::ffi::c_void; - use alloc::collections::BTreeSet; -use alloc::fmt::format; use alloc::format; use alloc::string::String; use alloc::vec::Vec; @@ -291,24 +288,25 @@ GROUP BY b.row_type, b.row_id", } fn mark_completed(&self) -> Result<(), SQLiteError> { - let priority_code = match &self.partial { + let priority_code: i32 = match &self.partial { None => { // language=SQLite self.db .exec_safe("DELETE FROM ps_updated_rows") .into_db_result(self.db)?; - -1 + BucketPriority::SENTINEL } - Some(partial) => partial.priority.into(), - }; + Some(partial) => partial.priority, + } + .into(); // Higher-priority buckets are always part of lower-priority sync operations too, so we can // delete information about higher-priority syncs (represented as lower priority numbers). - // A complete sync is represented as -1. + // A complete sync is represented by a number higher than the lowest priority we allow. // language=SQLite let stmt = self .db - .prepare_v2("DELETE FROM ps_sync_state WHERE (priority < ?1) OR (?1 = -1);") + .prepare_v2("DELETE FROM ps_sync_state WHERE priority < ?1;") .into_db_result(self.db)?; stmt.bind_int(1, priority_code)?; stmt.exec()?; diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index a60ae00..3cc0e2a 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -215,7 +215,7 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = -1"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') ''', }; From 4248310c9787163fe66b2e35127ea811f52d6fdb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 11 Feb 2025 13:44:48 +0100 Subject: [PATCH 13/13] Revert Cargo.toml change --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 024b6c3..790a462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" +strip = true [profile.release] panic = "abort"