Skip to content

Commit 61d0b54

Browse files
committed
Cache checksums in an op_checksum column.
1 parent 4aeb732 commit 61d0b54

File tree

3 files changed

+46
-26
lines changed

3 files changed

+46
-26
lines changed

crates/core/src/checkpoint.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,28 +35,20 @@ fn powersync_validate_checkpoint_impl(
3535
// language=SQLite
3636
let statement = db.prepare_v2(
3737
"WITH
38-
bucket_list(bucket, lower_op_id, checksum) AS (
38+
bucket_list(bucket, checksum) AS (
3939
SELECT
4040
json_extract(json_each.value, '$.bucket') as bucket,
41-
0 as lower_op_id,
4241
json_extract(json_each.value, '$.checksum') as checksum
4342
FROM json_each(json_extract(?1, '$.buckets'))
4443
)
4544
SELECT
4645
bucket_list.bucket as bucket,
4746
IFNULL(buckets.add_checksum, 0) as add_checksum,
48-
IFNULL(SUM(oplog.hash), 0) as oplog_checksum,
49-
COUNT(oplog.op_id) as count,
50-
IFNULL(MAX(oplog.op_id), 0) as last_op_id,
51-
IFNULL(buckets.last_applied_op, 0) as last_applied_op,
47+
IFNULL(buckets.op_checksum, 0) as oplog_checksum,
5248
bucket_list.checksum as expected_checksum
5349
FROM bucket_list
5450
LEFT OUTER JOIN ps_buckets AS buckets ON
5551
buckets.name = bucket_list.bucket
56-
LEFT OUTER JOIN ps_oplog AS oplog ON
57-
bucket_list.bucket = oplog.bucket AND
58-
oplog.op_id <= CAST(json_extract(?1, '$.last_op_id') as INTEGER) AND
59-
oplog.op_id > bucket_list.lower_op_id
6052
GROUP BY bucket_list.bucket",
6153
)?;
6254

@@ -69,10 +61,7 @@ GROUP BY bucket_list.bucket",
6961
// checksums with column_int are wrapped to i32 by SQLite
7062
let add_checksum = statement.column_int(1)?;
7163
let oplog_checksum = statement.column_int(2)?;
72-
let _count = statement.column_int(3)?;
73-
let _last_op_id = statement.column_int64(4)?;
74-
let _last_applied_op = statement.column_int64(5)?;
75-
let expected_checksum = statement.column_int(6)?;
64+
let expected_checksum = statement.column_int(3)?;
7665

7766
// wrapping add is like +, but safely overflows
7867
let checksum = oplog_checksum.wrapping_add(add_checksum);

crates/core/src/operations.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
9797

9898
let mut last_op: Option<i64> = None;
9999
let mut add_checksum: i32 = 0;
100+
let mut op_checksum: i32 = 0;
100101

101102
while iterate_statement.step()? == ResultCode::ROW {
102103
let op_id = iterate_statement.column_int64(0)?;
@@ -126,6 +127,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
126127
let superseded_op = supersede_statement.column_int64(0)?;
127128
let supersede_checksum = supersede_statement.column_int(1)?;
128129
add_checksum = add_checksum.wrapping_add(supersede_checksum);
130+
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
129131

130132
if superseded_op <= last_applied_op {
131133
// Superseded an operation previously applied - we cannot skip removes
@@ -172,6 +174,8 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
172174

173175
insert_statement.bind_int(8, checksum)?;
174176
insert_statement.exec()?;
177+
178+
op_checksum = op_checksum.wrapping_add(checksum);
175179
} else if op == "MOVE" {
176180
add_checksum = add_checksum.wrapping_add(checksum);
177181
} else if op == "CLEAR" {
@@ -185,14 +189,15 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
185189
// We also replace the checksum with the checksum of the CLEAR op.
186190
// language=SQLite
187191
let clear_statement2 = db.prepare_v2(
188-
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2",
192+
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE name = ?2",
189193
)?;
190194
clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?;
191195
clear_statement2.bind_int(1, checksum)?;
192196
clear_statement2.exec()?;
193197

194198
add_checksum = 0;
195199
last_applied_op = 0;
200+
op_checksum = 0;
196201
}
197202
}
198203

@@ -201,12 +206,14 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
201206
let statement = db.prepare_v2(
202207
"UPDATE ps_buckets
203208
SET last_op = ?2,
204-
add_checksum = add_checksum + ?3
209+
add_checksum = (add_checksum + ?3) & 0xffffffff,
210+
op_checksum = (op_checksum + ?4) & 0xffffffff
205211
WHERE name = ?1",
206212
)?;
207213
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
208214
statement.bind_int64(2, *last_op)?;
209215
statement.bind_int(3, add_checksum)?;
216+
statement.bind_int(4, op_checksum)?;
210217

211218
statement.exec()?;
212219
}
@@ -216,17 +223,27 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
216223

217224
pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
218225
// language=SQLite
219-
let statement =
220-
db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?;
226+
let statement = db.prepare_v2(
227+
"
228+
SELECT
229+
name,
230+
last_applied_op,
231+
(SELECT IFNULL(SUM(oplog.hash), 0)
232+
FROM ps_oplog oplog
233+
WHERE oplog.bucket = ps_buckets.name
234+
AND oplog.op_id <= ps_buckets.last_applied_op
235+
AND (oplog.superseded = 1 OR oplog.op != 3)
236+
) as checksum
237+
FROM ps_buckets
238+
WHERE ps_buckets.pending_delete = 0",
239+
)?;
221240

222241
// language=SQLite
223242
let update_statement = db.prepare_v2(
224-
"UPDATE ps_buckets
225-
SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0)
226-
FROM ps_oplog AS oplog
227-
WHERE (superseded = 1 OR op != 3)
228-
AND oplog.bucket = ?1
229-
AND oplog.op_id <= ?2)
243+
"
244+
UPDATE ps_buckets
245+
SET add_checksum = (add_checksum + ?2) & 0xffffffff,
246+
op_checksum = (op_checksum - ?2) & 0xffffffff
230247
WHERE ps_buckets.name = ?1",
231248
)?;
232249

@@ -243,9 +260,10 @@ pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQL
243260
// Note: Each iteration here may be run in a separate transaction.
244261
let name = statement.column_text(0)?;
245262
let last_applied_op = statement.column_int64(1)?;
263+
let checksum = statement.column_int(2)?;
246264

247265
update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
248-
update_statement.bind_int64(2, last_applied_op)?;
266+
update_statement.bind_int(2, checksum)?;
249267

250268
update_statement.exec()?;
251269

crates/core/src/view_admin.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
131131
return Err(SQLiteError::from(ResultCode::ABORT));
132132
}
133133

134-
const CODE_VERSION: i32 = 3;
134+
const CODE_VERSION: i32 = 4;
135135

136136
let mut current_version = current_version_stmt.column_int(0)?;
137137

@@ -248,6 +248,19 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('
248248
").into_db_result(local_db)?;
249249
}
250250

251+
if current_version < 4 {
252+
// language=SQLite
253+
local_db.exec_safe("\
254+
ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0;
255+
256+
UPDATE ps_buckets SET op_checksum = (
257+
SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name
258+
);
259+
260+
INSERT INTO ps_migration(id, down_migrations) VALUES(4, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum')));
261+
").into_db_result(local_db)?;
262+
}
263+
251264
Ok(String::from(""))
252265
}
253266

0 commit comments

Comments
 (0)