Skip to content

Commit a8c05e6

Browse files
committed
WIP: restructure storage format.
1 parent 59f3ac2 commit a8c05e6

File tree

5 files changed

+238
-189
lines changed

5 files changed

+238
-189
lines changed

crates/core/src/operations.rs

+62-112
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,22 @@ FROM json_each(?) e",
6262
let supersede_statement = db.prepare_v2(
6363
"\
6464
DELETE FROM ps_oplog
65-
WHERE ps_oplog.superseded = 0
66-
AND unlikely(ps_oplog.bucket = ?1)
65+
WHERE unlikely(ps_oplog.bucket = ?1)
6766
AND ps_oplog.key = ?2
6867
RETURNING op_id, hash",
6968
)?;
7069
supersede_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
7170

7271
// language=SQLite
7372
let insert_statement = db.prepare_v2("\
74-
INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)")?;
73+
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
7574
insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
7675

76+
let updated_row_statement = db.prepare_v2(
77+
"\
78+
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
79+
)?;
80+
7781
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
7882
// We can consider splitting this into separate SELECT and INSERT statements.
7983
// language=SQLite
@@ -98,7 +102,6 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
98102
let mut last_op: Option<i64> = None;
99103
let mut add_checksum: i32 = 0;
100104
let mut op_checksum: i32 = 0;
101-
let mut remove_operations: i32 = 0;
102105

103106
while iterate_statement.step()? == ResultCode::ROW {
104107
let op_id = iterate_statement.column_int64(0)?;
@@ -140,57 +143,75 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
140143
}
141144
supersede_statement.reset()?;
142145

143-
let should_skip_remove = !superseded && op == "REMOVE";
144-
if should_skip_remove {
145-
// If a REMOVE statement did not replace (supersede) any previous
146-
// operations, we do not need to persist it.
147-
// The same applies if the bucket was not synced to the local db yet,
148-
// even if it did supersede another operation.
149-
// Handle the same as MOVE.
146+
if (op == "REMOVE") {
147+
let should_skip_remove = !superseded;
148+
150149
add_checksum = add_checksum.wrapping_add(checksum);
150+
151+
if !should_skip_remove {
152+
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
153+
updated_row_statement.bind_text(
154+
1,
155+
object_type,
156+
sqlite::Destructor::STATIC,
157+
)?;
158+
updated_row_statement.bind_text(
159+
2,
160+
object_id,
161+
sqlite::Destructor::STATIC,
162+
)?;
163+
updated_row_statement.exec()?;
164+
}
165+
}
166+
151167
continue;
152168
}
153169

154-
let opi = if op == "PUT" { 3 } else { 4 };
155170
insert_statement.bind_int64(2, op_id)?;
156-
insert_statement.bind_int(3, opi)?;
157171
if key != "" {
158-
insert_statement.bind_text(4, &key, sqlite::Destructor::STATIC)?;
172+
insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?;
159173
} else {
160-
insert_statement.bind_null(4)?;
174+
insert_statement.bind_null(3)?;
161175
}
162176

163177
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
164-
insert_statement.bind_text(5, object_type, sqlite::Destructor::STATIC)?;
165-
insert_statement.bind_text(6, object_id, sqlite::Destructor::STATIC)?;
178+
insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?;
179+
insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?;
166180
} else {
181+
insert_statement.bind_null(4)?;
167182
insert_statement.bind_null(5)?;
168-
insert_statement.bind_null(6)?;
169183
}
170184
if let Ok(data) = op_data {
171-
insert_statement.bind_text(7, data, sqlite::Destructor::STATIC)?;
185+
insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?;
172186
} else {
173-
insert_statement.bind_null(7)?;
187+
insert_statement.bind_null(6)?;
174188
}
175189

176-
insert_statement.bind_int(8, checksum)?;
190+
insert_statement.bind_int(7, checksum)?;
177191
insert_statement.exec()?;
178192

179193
op_checksum = op_checksum.wrapping_add(checksum);
180-
181-
if opi == 4 {
182-
// We persisted a REMOVE statement, so the bucket needs
183-
// to be compacted at some point.
184-
remove_operations += 1;
185-
}
186194
} else if op == "MOVE" {
187195
add_checksum = add_checksum.wrapping_add(checksum);
188196
} else if op == "CLEAR" {
189197
// Any remaining PUT operations should get an implicit REMOVE
190198
// language=SQLite
191-
let clear_statement = db.prepare_v2("UPDATE ps_oplog SET op=4, data=NULL, hash=0 WHERE (op=3 OR op=4) AND bucket=?1").into_db_result(db)?;
192-
clear_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
193-
clear_statement.exec()?;
199+
let clear_statement1 = db
200+
.prepare_v2(
201+
"INSERT INTO ps_updated_rows(row_type, row_id)
202+
SELECT row_type, row_id
203+
FROM ps_oplog
204+
WHERE bucket = ?1",
205+
)
206+
.into_db_result(db)?;
207+
clear_statement1.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
208+
clear_statement1.exec()?;
209+
210+
let clear_statement2 = db
211+
.prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1")
212+
.into_db_result(db)?;
213+
clear_statement2.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
214+
clear_statement2.exec()?;
194215

195216
// And we need to re-apply all of those.
196217
// We also replace the checksum with the checksum of the CLEAR op.
@@ -214,15 +235,13 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
214235
"UPDATE ps_buckets
215236
SET last_op = ?2,
216237
add_checksum = (add_checksum + ?3) & 0xffffffff,
217-
op_checksum = (op_checksum + ?4) & 0xffffffff,
218-
remove_operations = (remove_operations + ?5)
238+
op_checksum = (op_checksum + ?4) & 0xffffffff
219239
WHERE name = ?1",
220240
)?;
221241
statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
222242
statement.bind_int64(2, *last_op)?;
223243
statement.bind_int(3, add_checksum)?;
224244
statement.bind_int(4, op_checksum)?;
225-
statement.bind_int(5, remove_operations)?;
226245

227246
statement.exec()?;
228247
}
@@ -231,108 +250,39 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
231250
}
232251

233252
pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
234-
// language=SQLite
235-
let statement = db.prepare_v2(
236-
"
237-
SELECT
238-
name,
239-
last_applied_op,
240-
(SELECT IFNULL(SUM(oplog.hash), 0)
241-
FROM ps_oplog oplog
242-
WHERE oplog.bucket = ps_buckets.name
243-
AND oplog.op_id <= ps_buckets.last_applied_op
244-
AND (oplog.superseded = 1 OR oplog.op != 3)
245-
) as checksum
246-
FROM ps_buckets
247-
WHERE ps_buckets.pending_delete = 0 AND
248-
ps_buckets.remove_operations >= CASE
249-
WHEN ?1 = '' THEN 1
250-
ELSE IFNULL(?1 ->> 'threshold', 1)
251-
END",
252-
)?;
253-
// Compact bucket if there are 50 or more operations
254-
statement.bind_text(1, _data, sqlite::Destructor::STATIC);
255-
256-
// language=SQLite
257-
let update_statement = db.prepare_v2(
258-
"
259-
UPDATE ps_buckets
260-
SET add_checksum = (add_checksum + ?2) & 0xffffffff,
261-
op_checksum = (op_checksum - ?2) & 0xffffffff,
262-
remove_operations = 0
263-
WHERE ps_buckets.name = ?1",
264-
)?;
265-
266-
// language=SQLite
267-
let delete_statement = db.prepare_v2(
268-
"DELETE
269-
FROM ps_oplog
270-
WHERE (superseded = 1 OR op != 3)
271-
AND bucket = ?1
272-
AND op_id <= ?2",
273-
)?;
274-
275-
while statement.step()? == ResultCode::ROW {
276-
// Note: Each iteration here may be run in a separate transaction.
277-
let name = statement.column_text(0)?;
278-
let last_applied_op = statement.column_int64(1)?;
279-
let checksum = statement.column_int(2)?;
280-
281-
update_statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
282-
update_statement.bind_int(2, checksum)?;
283-
update_statement.exec()?;
284-
285-
// Must use the same values as above
286-
delete_statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
287-
delete_statement.bind_int64(2, last_applied_op)?;
288-
delete_statement.exec()?;
289-
}
253+
// No-op
290254

291255
Ok(())
292256
}
293257

294258
pub fn delete_pending_buckets(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> {
295-
// language=SQLite
296-
let statement = db.prepare_v2(
297-
"DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)")?;
298-
statement.exec()?;
299-
300-
// language=SQLite
301-
let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op")?;
302-
statement.exec()?;
259+
// No-op
303260

304261
Ok(())
305262
}
306263

307264
pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> {
308-
let id = gen_uuid();
309-
let new_name = format!("$delete_{}_{}", name, id.hyphenated().to_string());
310-
311265
// language=SQLite
312266
let statement = db.prepare_v2(
313-
"UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2",
267+
"\
268+
INSERT INTO ps_updated_rows(row_type, row_id)
269+
SELECT row_type, row_id
270+
FROM ps_oplog
271+
WHERE bucket = ?1",
314272
)?;
315-
statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?;
316-
statement.bind_text(2, &name, sqlite::Destructor::STATIC)?;
273+
statement.bind_text(1, &name, sqlite::Destructor::STATIC)?;
317274
statement.exec()?;
318275

319276
// Rename bucket
320277
// language=SQLite
321-
let statement = db.prepare_v2("UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?;
322-
statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?;
323-
statement.bind_text(2, name, sqlite::Destructor::STATIC)?;
278+
let statement = db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?;
279+
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
324280
statement.exec()?;
325281

326282
// language=SQLite
327283
let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1")?;
328284
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;
329285
statement.exec()?;
330286

331-
// language=SQLite
332-
let statement = db.prepare_v2(
333-
"INSERT INTO ps_buckets(name, pending_delete, last_op) SELECT ?1, 1, IFNULL(MAX(op_id), 0) FROM ps_oplog WHERE bucket = ?1")?;
334-
statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?;
335-
statement.exec()?;
336-
337287
Ok(())
338288
}

0 commit comments

Comments
 (0)