Skip to content

Commit fee756b

Browse files
authored
Rewrite the "sync_local" query (#78)
* Add test setup for tracking VFS operations on sync_local. * Optimize queries. * Fix for test concurrency issue. * Expand performance tests; add hard limits. * Change to is_err. * Expand on comments. * Add various queries to the performance tests. * Rename test file. * Move annotations to the test file; add a link to docs.
1 parent 5959ddf commit fee756b

File tree

8 files changed

+504
-51
lines changed

8 files changed

+504
-51
lines changed

crates/core/src/sync_local.rs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,16 @@ impl<'a> SyncOperation<'a> {
107107
while statement.step().into_db_result(self.db)? == ResultCode::ROW {
108108
let type_name = statement.column_text(0)?;
109109
let id = statement.column_text(1)?;
110-
let buckets = statement.column_int(3);
111110
let data = statement.column_text(2);
112111

113112
let table_name = internal_table_name(type_name);
114113

115114
if self.data_tables.contains(&table_name) {
116115
let quoted = quote_internal_name(type_name, false);
117116

118-
if buckets == 0 {
117+
// is_err() is essentially a NULL check here.
118+
// NULL data means no PUT operations found, so we delete the row.
119+
if data.is_err() {
119120
// DELETE
120121
let delete_statement = self
121122
.db
@@ -134,7 +135,7 @@ impl<'a> SyncOperation<'a> {
134135
insert_statement.exec()?;
135136
}
136137
} else {
137-
if buckets == 0 {
138+
if data.is_err() {
138139
// DELETE
139140
// language=SQLite
140141
let delete_statement = self
@@ -185,32 +186,29 @@ impl<'a> SyncOperation<'a> {
185186
Ok(match &self.partial {
186187
None => {
187188
// Complete sync
189+
// See dart/test/sync_local_performance_test.dart for an annotated version of this query.
188190
self.db
189191
.prepare_v2(
190192
"\
191-
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
192-
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
193193
WITH updated_rows AS (
194-
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
195-
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
196-
AND (b.op_id > buckets.last_applied_op)
197-
UNION SELECT row_type, row_id FROM ps_updated_rows
194+
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
195+
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
196+
AND (b.op_id > buckets.last_applied_op)
197+
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
198198
)
199199
200-
-- 3. Group the objects from different buckets together into a single one (ops).
201-
SELECT b.row_type as type,
202-
b.row_id as id,
203-
r.data as data,
204-
count(r.bucket) as buckets,
205-
/* max() affects which row is used for 'data' */
206-
max(r.op_id) as op_id
207-
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
208-
FROM updated_rows b
209-
LEFT OUTER JOIN ps_oplog AS r
210-
ON r.row_type = b.row_type
211-
AND r.row_id = b.row_id
212-
-- Group for (3)
213-
GROUP BY b.row_type, b.row_id",
200+
SELECT
201+
b.row_type,
202+
b.row_id,
203+
(
204+
SELECT iif(max(r.op_id), r.data, null)
205+
FROM ps_oplog r
206+
WHERE r.row_type = b.row_type
207+
AND r.row_id = b.row_id
208+
209+
) as data
210+
FROM updated_rows b
211+
GROUP BY b.row_type, b.row_id;",
214212
)
215213
.into_db_result(self.db)?
216214
}
@@ -220,33 +218,38 @@ GROUP BY b.row_type, b.row_id",
220218
.prepare_v2(
221219
"\
222220
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
223-
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
221+
-- We do not do any DISTINCT operation here, since that introduces a temp b-tree.
222+
-- We filter out duplicates using the GROUP BY below.
224223
WITH
225224
involved_buckets (id) AS MATERIALIZED (
226225
SELECT id FROM ps_buckets WHERE ?1 IS NULL
227226
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
228227
),
229228
updated_rows AS (
230-
SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
231-
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
232-
WHERE buckets.id IN (SELECT id FROM involved_buckets)
229+
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
230+
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
231+
AND (b.op_id > buckets.last_applied_op)
232+
WHERE buckets.id IN (SELECT id FROM involved_buckets)
233233
)
234234
235-
-- 3. Group the objects from different buckets together into a single one (ops).
236-
SELECT b.row_type as type,
237-
b.row_id as id,
238-
r.data as data,
239-
count(r.bucket) as buckets,
240-
/* max() affects which row is used for 'data' */
241-
max(r.op_id) as op_id
242235
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
243-
FROM updated_rows b
244-
LEFT OUTER JOIN ps_oplog AS r
245-
ON r.row_type = b.row_type
246-
AND r.row_id = b.row_id
247-
AND r.bucket IN (SELECT id FROM involved_buckets)
248-
-- Group for (3)
249-
GROUP BY b.row_type, b.row_id",
236+
SELECT
237+
b.row_type,
238+
b.row_id,
239+
(
240+
-- 3. For each unique row, select the data from the latest oplog entry.
241+
-- The max(r.op_id) clause is used to select the latest oplog entry.
242+
-- The iif is to avoid the max(r.op_id) column ending up in the results.
243+
SELECT iif(max(r.op_id), r.data, null)
244+
FROM ps_oplog r
245+
WHERE r.row_type = b.row_type
246+
AND r.row_id = b.row_id
247+
AND r.bucket IN (SELECT id FROM involved_buckets)
248+
249+
) as data
250+
FROM updated_rows b
251+
-- Group for (2)
252+
GROUP BY b.row_type, b.row_id;",
250253
)
251254
.into_db_result(self.db)?;
252255
stmt.bind_text(1, partial.args, Destructor::STATIC)?;

dart/pubspec.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,10 @@ packages:
293293
dependency: "direct main"
294294
description:
295295
name: sqlite3
296-
sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e"
296+
sha256: c0503c69b44d5714e6abbf4c1f51a3c3cc42b75ce785f44404765e4635481d38
297297
url: "https://pub.dev"
298298
source: hosted
299-
version: "2.7.5"
299+
version: "2.7.6"
300300
sqlite3_test:
301301
dependency: "direct dev"
302302
description:

dart/pubspec.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ description: Tests for powersync-sqlite-core
55
environment:
66
sdk: ^3.4.0
77
dependencies:
8-
sqlite3: ^2.4.5
8+
sqlite3: ^2.7.6
99
dev_dependencies:
1010
test: ^1.25.0
1111
file: ^7.0.1
1212
sqlite3_test: ^0.1.1
13-
fake_async: ^1.3.3
13+
fake_async: ^1.3.3

dart/test/js_key_encoding_test.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import 'package:test/test.dart';
99
import 'utils/native_test_utils.dart';
1010

1111
void main() {
12-
final vfs = TestSqliteFileSystem(fs: const LocalFileSystem());
12+
// Needs an unique name per test file to avoid concurrency issues
13+
final vfs = TestSqliteFileSystem(
14+
fs: const LocalFileSystem(), name: 'js-key-encoding-test-vfs');
1315
late CommonDatabase db;
1416

1517
setUpAll(() {
@@ -19,7 +21,7 @@ void main() {
1921
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));
2022

2123
setUp(() async {
22-
db = openTestDatabase(vfs)
24+
db = openTestDatabase(vfs: vfs)
2325
..select('select powersync_init();')
2426
..select('select powersync_replace_schema(?)', [json.encode(_schema)]);
2527
});

0 commit comments

Comments
 (0)