From 61d0b54396decd9a0643b312e26b02e6dc24fe83 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Sep 2024 15:25:26 +0200 Subject: [PATCH 1/3] Cache checksums in an op_checksum column. --- crates/core/src/checkpoint.rs | 17 +++------------ crates/core/src/operations.rs | 40 +++++++++++++++++++++++++---------- crates/core/src/view_admin.rs | 15 ++++++++++++- 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 10e434d..30cf11c 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -35,28 +35,20 @@ fn powersync_validate_checkpoint_impl( // language=SQLite let statement = db.prepare_v2( "WITH -bucket_list(bucket, lower_op_id, checksum) AS ( +bucket_list(bucket, checksum) AS ( SELECT json_extract(json_each.value, '$.bucket') as bucket, - 0 as lower_op_id, json_extract(json_each.value, '$.checksum') as checksum FROM json_each(json_extract(?1, '$.buckets')) ) SELECT bucket_list.bucket as bucket, IFNULL(buckets.add_checksum, 0) as add_checksum, - IFNULL(SUM(oplog.hash), 0) as oplog_checksum, - COUNT(oplog.op_id) as count, - IFNULL(MAX(oplog.op_id), 0) as last_op_id, - IFNULL(buckets.last_applied_op, 0) as last_applied_op, + IFNULL(buckets.op_checksum, 0) as oplog_checksum, bucket_list.checksum as expected_checksum FROM bucket_list LEFT OUTER JOIN ps_buckets AS buckets ON buckets.name = bucket_list.bucket - LEFT OUTER JOIN ps_oplog AS oplog ON - bucket_list.bucket = oplog.bucket AND - oplog.op_id <= CAST(json_extract(?1, '$.last_op_id') as INTEGER) AND - oplog.op_id > bucket_list.lower_op_id GROUP BY bucket_list.bucket", )?; @@ -69,10 +61,7 @@ GROUP BY bucket_list.bucket", // checksums with column_int are wrapped to i32 by SQLite let add_checksum = statement.column_int(1)?; let oplog_checksum = statement.column_int(2)?; - let _count = statement.column_int(3)?; - let _last_op_id = statement.column_int64(4)?; - let _last_applied_op = statement.column_int64(5)?; - let expected_checksum = statement.column_int(6)?; + let expected_checksum = statement.column_int(3)?; // wrapping add is like +, but safely overflows let checksum = oplog_checksum.wrapping_add(add_checksum); diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index e3802e0..d4e380f 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -97,6 +97,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let mut last_op: Option = None; let mut add_checksum: i32 = 0; + let mut op_checksum: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -126,6 +127,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let superseded_op = supersede_statement.column_int64(0)?; let supersede_checksum = supersede_statement.column_int(1)?; add_checksum = add_checksum.wrapping_add(supersede_checksum); + op_checksum = op_checksum.wrapping_sub(supersede_checksum); if superseded_op <= last_applied_op { // Superseded an operation previously applied - we cannot skip removes @@ -172,6 +174,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super insert_statement.bind_int(8, checksum)?; insert_statement.exec()?; + + op_checksum = op_checksum.wrapping_add(checksum); } else if op == "MOVE" { add_checksum = add_checksum.wrapping_add(checksum); } else if op == "CLEAR" { @@ -185,7 +189,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super // We also replace the checksum with the checksum of the CLEAR op. // language=SQLite let clear_statement2 = db.prepare_v2( - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2", + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2", )?; clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?; clear_statement2.bind_int(1, checksum)?; @@ -193,6 +197,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super add_checksum = 0; last_applied_op = 0; + op_checksum = 0; } } @@ -201,12 +206,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let statement = db.prepare_v2( "UPDATE ps_buckets SET last_op = ?2, - add_checksum = add_checksum + ?3 + add_checksum = (add_checksum + ?3) & 0xffffffff, + op_checksum = (op_checksum + ?4) & 0xffffffff WHERE name = ?1", )?; statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; statement.bind_int64(2, *last_op)?; statement.bind_int(3, add_checksum)?; + statement.bind_int(4, op_checksum)?; statement.exec()?; } @@ -216,17 +223,27 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { // language=SQLite - let statement = - db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?; + let statement = db.prepare_v2( + " +SELECT + name, + last_applied_op, + (SELECT IFNULL(SUM(oplog.hash), 0) + FROM ps_oplog oplog + WHERE oplog.bucket = ps_buckets.name + AND oplog.op_id <= ps_buckets.last_applied_op + AND (oplog.superseded = 1 OR oplog.op != 3) + ) as checksum +FROM ps_buckets +WHERE ps_buckets.pending_delete = 0", + )?; // language=SQLite let update_statement = db.prepare_v2( - "UPDATE ps_buckets - SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0) - FROM ps_oplog AS oplog - WHERE (superseded = 1 OR op != 3) - AND oplog.bucket = ?1 - AND oplog.op_id <= ?2) + " + UPDATE ps_buckets + SET add_checksum = (add_checksum + ?2) & 0xffffffff, + op_checksum = (op_checksum - ?2) & 0xffffffff WHERE ps_buckets.name = ?1", )?; @@ -243,9 +260,10 @@ pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQL // Note: Each iteration here may be run in a separate transaction. let name = statement.column_text(0)?; let last_applied_op = statement.column_int64(1)?; + let checksum = statement.column_int(2)?; update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - update_statement.bind_int64(2, last_applied_op)?; + update_statement.bind_int(2, checksum)?; update_statement.exec()?; diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index bc05706..0af4366 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -131,7 +131,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations return Err(SQLiteError::from(ResultCode::ABORT)); } - const CODE_VERSION: i32 = 3; + const CODE_VERSION: i32 = 4; let mut current_version = current_version_stmt.column_int(0)?; @@ -248,6 +248,19 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object(' ").into_db_result(local_db)?; } + if current_version < 4 { + // language=SQLite + local_db.exec_safe("\ +ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; + +UPDATE ps_buckets SET op_checksum = ( + SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name +); + +INSERT INTO ps_migration(id, down_migrations) VALUES(4, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'))); + ").into_db_result(local_db)?; + } + Ok(String::from("")) } From 7aef64e46ecf61bc20621972a41ac135177209ce Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Sep 2024 15:42:31 +0200 Subject: [PATCH 2/3] Only compact buckets when we need to. --- crates/core/src/operations.rs | 14 +++++++++++--- crates/core/src/view_admin.rs | 9 ++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index d4e380f..147638a 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -98,6 +98,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let mut last_op: Option = None; let mut add_checksum: i32 = 0; let mut op_checksum: i32 = 0; + let mut pending_compact: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -176,6 +177,12 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super insert_statement.exec()?; op_checksum = op_checksum.wrapping_add(checksum); + + if opi == 4 { + // We persisted a REMOVE statement, so the bucket needs + // to be compacted at some point. + pending_compact = 1; + } } else if op == "MOVE" { add_checksum = add_checksum.wrapping_add(checksum); } else if op == "CLEAR" { @@ -207,13 +214,15 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super "UPDATE ps_buckets SET last_op = ?2, add_checksum = (add_checksum + ?3) & 0xffffffff, - op_checksum = (op_checksum + ?4) & 0xffffffff + op_checksum = (op_checksum + ?4) & 0xffffffff, + pending_compact = (pending_compact OR ?5) WHERE name = ?1", )?; statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; statement.bind_int64(2, *last_op)?; statement.bind_int(3, add_checksum)?; statement.bind_int(4, op_checksum)?; + statement.bind_int(5, pending_compact)?; statement.exec()?; } @@ -235,7 +244,7 @@ SELECT AND (oplog.superseded = 1 OR oplog.op != 3) ) as checksum FROM ps_buckets -WHERE ps_buckets.pending_delete = 0", +WHERE ps_buckets.pending_delete = 0 AND ps_buckets.pending_compact = 1", )?; // language=SQLite @@ -264,7 +273,6 @@ WHERE ps_buckets.pending_delete = 0", update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?; update_statement.bind_int(2, checksum)?; - update_statement.exec()?; // Must use the same values as above diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 0af4366..392c886 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -252,12 +252,19 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object(' // language=SQLite local_db.exec_safe("\ ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ps_buckets ADD COLUMN pending_compact INTEGER NOT NULL DEFAULT 0; UPDATE ps_buckets SET op_checksum = ( SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name ); -INSERT INTO ps_migration(id, down_migrations) VALUES(4, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'))); +INSERT INTO ps_migration(id, down_migrations) + VALUES(4, + json_array( + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN pending_compact') + )); ").into_db_result(local_db)?; } From 59f3ac236c6ff490588ce7bb00f12f52246c380a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Sep 2024 16:52:25 +0200 Subject: [PATCH 3/3] Compact based on remove operation count. --- crates/core/src/operations.rs | 19 +++++++++++++------ crates/core/src/view_admin.rs | 4 ++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 147638a..b84b682 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -98,7 +98,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super let mut last_op: Option = None; let mut add_checksum: i32 = 0; let mut op_checksum: i32 = 0; - let mut pending_compact: i32 = 0; + let mut remove_operations: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -181,7 +181,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super if opi == 4 { // We persisted a REMOVE statement, so the bucket needs // to be compacted at some point. - pending_compact = 1; + remove_operations += 1; } } else if op == "MOVE" { add_checksum = add_checksum.wrapping_add(checksum); @@ -215,14 +215,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super SET last_op = ?2, add_checksum = (add_checksum + ?3) & 0xffffffff, op_checksum = (op_checksum + ?4) & 0xffffffff, - pending_compact = (pending_compact OR ?5) + remove_operations = (remove_operations + ?5) WHERE name = ?1", )?; statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; statement.bind_int64(2, *last_op)?; statement.bind_int(3, add_checksum)?; statement.bind_int(4, op_checksum)?; - statement.bind_int(5, pending_compact)?; + statement.bind_int(5, remove_operations)?; statement.exec()?; } @@ -244,15 +244,22 @@ SELECT AND (oplog.superseded = 1 OR oplog.op != 3) ) as checksum FROM ps_buckets -WHERE ps_buckets.pending_delete = 0 AND ps_buckets.pending_compact = 1", +WHERE ps_buckets.pending_delete = 0 AND + ps_buckets.remove_operations >= CASE + WHEN ?1 = '' THEN 1 + ELSE IFNULL(?1 ->> 'threshold', 1) + END", )?; + // Compact bucket if there are 50 or more operations + statement.bind_text(1, _data, sqlite::Destructor::STATIC); // language=SQLite let update_statement = db.prepare_v2( " UPDATE ps_buckets SET add_checksum = (add_checksum + ?2) & 0xffffffff, - op_checksum = (op_checksum - ?2) & 0xffffffff + op_checksum = (op_checksum - ?2) & 0xffffffff, + remove_operations = 0 WHERE ps_buckets.name = ?1", )?; diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 392c886..f7c7613 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -252,7 +252,7 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object(' // language=SQLite local_db.exec_safe("\ ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; -ALTER TABLE ps_buckets ADD COLUMN pending_compact INTEGER NOT NULL DEFAULT 0; +ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0; UPDATE ps_buckets SET op_checksum = ( SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name @@ -263,7 +263,7 @@ INSERT INTO ps_migration(id, down_migrations) json_array( json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), - json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN pending_compact') + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations') )); ").into_db_result(local_db)?; }