Skip to content

Commit 8107e42

Browse files
committed
Rewrite the "sync_local" query.
1 parent 024fda8 commit 8107e42

File tree

1 file changed

+72
-24
lines changed

1 file changed

+72
-24
lines changed

crates/core/src/sync_local.rs

+72-24
Original file line numberDiff line numberDiff line change
@@ -55,50 +55,98 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteEr
5555

5656
// Query for updated objects
5757

58+
// Be careful with modifying this query - it is critical for performance. When modifying, make sure to check
59+
// performance of the query with a large number of rows, and also with a large number of duplicate rows (same row_id).
60+
//
61+
// This form uses a subquery with max(r.op_id) instead of a JOIN to get the latest oplog entry for each updated row.
62+
// The subquery is because:
63+
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
64+
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
65+
// 2. We want to avoid using a second GROUP BY, which would use a secondary TEMP B-TREE.
66+
//
67+
// It does not appear to be feasible to avoid the single TEMP B-TREE here.
68+
//
69+
// The query roughly does the following:
70+
// 1. Filter oplog by the ops added but not applied yet (oplog b). These are not unique.
71+
// 2. Use GROUP BY to get unique rows. This adds some overhead because of the TEMP B-TREE, but is necessary
72+
// to cover cases of duplicate rows. DISTINCT would do the same in theory, but is slower than GROUP BY in practice.
73+
// 3. For each op, find the latest version of the data. This is done using a subquery, with `max(r.op_id)`` to
74+
// select the latest version.
75+
//
76+
// The subquery instead of a JOIN is because:
77+
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
78+
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
79+
// 2. We want to avoid using a second GROUP BY, which would use a second TEMP B-TREE.
80+
//
81+
// The `ifnull(data, max(op_id))` clause is a hack to pick the row with the largest op_id, but only select the data.
82+
//
83+
// QUERY PLAN
84+
// |--CO-ROUTINE updated_rows
85+
// | `--COMPOUND QUERY
86+
// | |--LEFT-MOST SUBQUERY
87+
// | | |--SCAN buckets USING COVERING INDEX ps_buckets_name
88+
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
89+
// | `--UNION ALL
90+
// | `--SCAN ps_updated_rows
91+
// |--SCAN b
92+
// |--USE TEMP B-TREE FOR GROUP BY
93+
// `--CORRELATED SCALAR SUBQUERY 3
94+
// `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?)
95+
5896
// language=SQLite
5997
let statement = db
6098
.prepare_v2(
6199
"\
62-
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
63-
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
64100
WITH updated_rows AS (
65-
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
66-
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
67-
AND (b.op_id > buckets.last_applied_op)
68-
UNION SELECT row_type, row_id FROM ps_updated_rows
101+
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
102+
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
103+
AND (b.op_id > buckets.last_applied_op)
104+
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
69105
)
70106
71-
-- 3. Group the objects from different buckets together into a single one (ops).
72-
SELECT b.row_type as type,
73-
b.row_id as id,
74-
r.data as data,
75-
count(r.bucket) as buckets,
76-
/* max() affects which row is used for 'data' */
77-
max(r.op_id) as op_id
78-
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
79-
FROM updated_rows b
80-
LEFT OUTER JOIN ps_oplog AS r
81-
ON r.row_type = b.row_type
82-
AND r.row_id = b.row_id
83-
-- Group for (3)
84-
GROUP BY b.row_type, b.row_id",
107+
SELECT
108+
b.row_type,
109+
b.row_id,
110+
(
111+
SELECT ifnull(r.data, max(r.op_id))
112+
FROM ps_oplog r
113+
WHERE r.row_type = b.row_type
114+
AND r.row_id = b.row_id
115+
) as data
116+
FROM updated_rows b;
117+
GROUP BY b.row_type, b.row_id;
118+
",
85119
)
86120
.into_db_result(db)?;
87121

88-
// TODO: cache statements
122+
// An alternative form of the query is this:
123+
//
124+
// SELECT r.row_type as type,
125+
// r.row_id as id,
126+
// r.data as data,
127+
// max(r.op_id) as op_id
128+
// FROM ps_oplog r
129+
// GROUP BY r.row_type, r.row_id;
130+
//
131+
// This form is simple and fast, but does not filter only on updated rows. It also ignores ps_updated_rows.
132+
// We could later add heuristics to use this form on initial sync, or when a large number of rows have been re-synced.
133+
//
134+
// QUERY PLAN
135+
// `--SCAN r USING INDEX ps_oplog_row
136+
137+
// TODO: cache individual statements
89138

90139
while statement.step().into_db_result(db)? == ResultCode::ROW {
91140
let type_name = statement.column_text(0)?;
92141
let id = statement.column_text(1)?;
93-
let buckets = statement.column_int(3)?;
94142
let data = statement.column_text(2);
95143

96144
let table_name = internal_table_name(type_name);
97145

98146
if tables.contains(&table_name) {
99147
let quoted = quote_internal_name(type_name, false);
100148

101-
if buckets == 0 {
149+
if data.is_err() {
102150
// DELETE
103151
let delete_statement = db
104152
.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted))
@@ -115,7 +163,7 @@ GROUP BY b.row_type, b.row_id",
115163
insert_statement.exec()?;
116164
}
117165
} else {
118-
if buckets == 0 {
166+
if data.is_err() {
119167
// DELETE
120168
// language=SQLite
121169
let delete_statement = db

0 commit comments

Comments
 (0)