Skip to content

Persist Checksums #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions crates/core/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,20 @@ fn powersync_validate_checkpoint_impl(
// language=SQLite
let statement = db.prepare_v2(
"WITH
bucket_list(bucket, lower_op_id, checksum) AS (
bucket_list(bucket, checksum) AS (
SELECT
json_extract(json_each.value, '$.bucket') as bucket,
0 as lower_op_id,
json_extract(json_each.value, '$.checksum') as checksum
FROM json_each(json_extract(?1, '$.buckets'))
)
SELECT
bucket_list.bucket as bucket,
IFNULL(buckets.add_checksum, 0) as add_checksum,
IFNULL(SUM(oplog.hash), 0) as oplog_checksum,
COUNT(oplog.op_id) as count,
IFNULL(MAX(oplog.op_id), 0) as last_op_id,
IFNULL(buckets.last_applied_op, 0) as last_applied_op,
IFNULL(buckets.op_checksum, 0) as oplog_checksum,
bucket_list.checksum as expected_checksum
FROM bucket_list
LEFT OUTER JOIN ps_buckets AS buckets ON
buckets.name = bucket_list.bucket
LEFT OUTER JOIN ps_oplog AS oplog ON
bucket_list.bucket = oplog.bucket AND
oplog.op_id <= CAST(json_extract(?1, '$.last_op_id') as INTEGER) AND
oplog.op_id > bucket_list.lower_op_id
GROUP BY bucket_list.bucket",
)?;

Expand All @@ -69,10 +61,7 @@ GROUP BY bucket_list.bucket",
// checksums with column_int are wrapped to i32 by SQLite
let add_checksum = statement.column_int(1)?;
let oplog_checksum = statement.column_int(2)?;
let _count = statement.column_int(3)?;
let _last_op_id = statement.column_int64(4)?;
let _last_applied_op = statement.column_int64(5)?;
let expected_checksum = statement.column_int(6)?;
let expected_checksum = statement.column_int(3)?;

// wrapping add is like +, but safely overflows
let checksum = oplog_checksum.wrapping_add(add_checksum);
Expand Down
57 changes: 45 additions & 12 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super

let mut last_op: Option<i64> = None;
let mut add_checksum: i32 = 0;
let mut op_checksum: i32 = 0;
let mut remove_operations: i32 = 0;

while iterate_statement.step()? == ResultCode::ROW {
let op_id = iterate_statement.column_int64(0)?;
Expand Down Expand Up @@ -126,6 +128,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
let superseded_op = supersede_statement.column_int64(0)?;
let supersede_checksum = supersede_statement.column_int(1)?;
add_checksum = add_checksum.wrapping_add(supersede_checksum);
op_checksum = op_checksum.wrapping_sub(supersede_checksum);

if superseded_op <= last_applied_op {
// Superseded an operation previously applied - we cannot skip removes
Expand Down Expand Up @@ -172,6 +175,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super

insert_statement.bind_int(8, checksum)?;
insert_statement.exec()?;

op_checksum = op_checksum.wrapping_add(checksum);

if opi == 4 {
// We persisted a REMOVE statement, so the bucket needs
// to be compacted at some point.
remove_operations += 1;
}
} else if op == "MOVE" {
add_checksum = add_checksum.wrapping_add(checksum);
} else if op == "CLEAR" {
Expand All @@ -185,14 +196,15 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
// We also replace the checksum with the checksum of the CLEAR op.
// language=SQLite
let clear_statement2 = db.prepare_v2(
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2",
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2",
)?;
clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?;
clear_statement2.bind_int(1, checksum)?;
clear_statement2.exec()?;

add_checksum = 0;
last_applied_op = 0;
op_checksum = 0;
}
}

Expand All @@ -201,12 +213,16 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
let statement = db.prepare_v2(
"UPDATE ps_buckets
SET last_op = ?2,
add_checksum = add_checksum + ?3
add_checksum = (add_checksum + ?3) & 0xffffffff,
op_checksum = (op_checksum + ?4) & 0xffffffff,
remove_operations = (remove_operations + ?5)
WHERE name = ?1",
)?;
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
statement.bind_int64(2, *last_op)?;
statement.bind_int(3, add_checksum)?;
statement.bind_int(4, op_checksum)?;
statement.bind_int(5, remove_operations)?;

statement.exec()?;
}
Expand All @@ -216,17 +232,34 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super

pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
// language=SQLite
let statement =
db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?;
let statement = db.prepare_v2(
"
SELECT
name,
last_applied_op,
(SELECT IFNULL(SUM(oplog.hash), 0)
FROM ps_oplog oplog
WHERE oplog.bucket = ps_buckets.name
AND oplog.op_id <= ps_buckets.last_applied_op
AND (oplog.superseded = 1 OR oplog.op != 3)
) as checksum
FROM ps_buckets
WHERE ps_buckets.pending_delete = 0 AND
ps_buckets.remove_operations >= CASE
WHEN ?1 = '' THEN 1
ELSE IFNULL(?1 ->> 'threshold', 1)
END",
)?;
// Compact bucket if there are 50 or more operations
statement.bind_text(1, _data, sqlite::Destructor::STATIC);

// language=SQLite
let update_statement = db.prepare_v2(
"UPDATE ps_buckets
SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
FROM ps_oplog AS oplog
WHERE (superseded = 1 OR op != 3)
AND oplog.bucket = ?1
AND oplog.op_id <= ?2)
"
UPDATE ps_buckets
SET add_checksum = (add_checksum + ?2) & 0xffffffff,
op_checksum = (op_checksum - ?2) & 0xffffffff,
remove_operations = 0
WHERE ps_buckets.name = ?1",
)?;

Expand All @@ -243,10 +276,10 @@ pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQL
// Note: Each iteration here may be run in a separate transaction.
let name = statement.column_text(0)?;
let last_applied_op = statement.column_int64(1)?;
let checksum = statement.column_int(2)?;

update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
update_statement.bind_int64(2, last_applied_op)?;

update_statement.bind_int(2, checksum)?;
update_statement.exec()?;

// Must use the same values as above
Expand Down
22 changes: 21 additions & 1 deletion crates/core/src/view_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
return Err(SQLiteError::from(ResultCode::ABORT));
}

const CODE_VERSION: i32 = 3;
const CODE_VERSION: i32 = 4;

let mut current_version = current_version_stmt.column_int(0)?;

Expand Down Expand Up @@ -250,6 +250,26 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('
").into_db_result(local_db)?;
}

if current_version < 4 {
// language=SQLite
local_db.exec_safe("\
ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0;
ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0;

UPDATE ps_buckets SET op_checksum = (
SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name
);

INSERT INTO ps_migration(id, down_migrations)
VALUES(4,
json_array(
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'),
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'),
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations')
));
").into_db_result(local_db)?;
}

setup_internal_views(local_db)?;

Ok(String::from(""))
Expand Down
Loading