Skip to content

Commit 7b9fb06

Browse files
committed
Don't store priorities
1 parent f2cc286 commit 7b9fb06

9 files changed

+127
-138
lines changed

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ default-members = ["crates/shell", "crates/sqlite"]
99

1010
[profile.dev]
1111
panic = "abort"
12-
strip = true
1312

1413
[profile.release]
1514
panic = "abort"

crates/core/src/bucket_priority.rs

+42-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use core::ops::RangeInclusive;
22

3+
use serde::{de::Visitor, Deserialize};
34
use sqlite_nostd::ResultCode;
45

56
use crate::error::SQLiteError;
@@ -34,12 +35,6 @@ impl TryFrom<i32> for BucketPriority {
3435
}
3536
}
3637

37-
impl Default for BucketPriority {
38-
fn default() -> Self {
39-
Self(1)
40-
}
41-
}
42-
4338
impl Into<i32> for BucketPriority {
4439
fn into(self) -> i32 {
4540
self.0
@@ -51,3 +46,44 @@ impl PartialOrd<BucketPriority> for BucketPriority {
5146
Some(self.0.partial_cmp(&other.0)?.reverse())
5247
}
5348
}
49+
50+
impl<'de> Deserialize<'de> for BucketPriority {
51+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52+
where
53+
D: serde::Deserializer<'de>,
54+
{
55+
struct PriorityVisitor;
56+
impl<'de> Visitor<'de> for PriorityVisitor {
57+
type Value = BucketPriority;
58+
59+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
60+
formatter.write_str("a priority as an integer between 0 and 3 (inclusive)")
61+
}
62+
63+
fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
64+
where
65+
E: serde::de::Error,
66+
{
67+
BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default()))
68+
}
69+
70+
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
71+
where
72+
E: serde::de::Error,
73+
{
74+
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
75+
Self::visit_i32(self, i)
76+
}
77+
78+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
79+
where
80+
E: serde::de::Error,
81+
{
82+
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
83+
Self::visit_i32(self, i)
84+
}
85+
}
86+
87+
deserializer.deserialize_i32(PriorityVisitor)
88+
}
89+
}

crates/core/src/migrations.rs

-16
Original file line numberDiff line numberDiff line change
@@ -310,21 +310,5 @@ json_array(
310310
.into_db_result(local_db)?;
311311
}
312312

313-
if current_version < 7 && target_version >= 7 {
314-
local_db
315-
.exec_safe(
316-
"\
317-
ALTER TABLE ps_buckets ADD COLUMN priority INTEGER NOT NULL DEFAULT 1;
318-
INSERT INTO ps_migration(id, down_migrations)
319-
VALUES(7,
320-
json_array(
321-
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN priority'),
322-
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
323-
));
324-
",
325-
)
326-
.into_db_result(local_db)?;
327-
}
328-
329313
Ok(())
330314
}

crates/core/src/operations.rs

+4-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use alloc::format;
22
use alloc::string::String;
33

4-
use crate::bucket_priority::BucketPriority;
54
use crate::error::{PSResult, SQLiteError};
65
use sqlite_nostd as sqlite;
76
use sqlite_nostd::{Connection, ResultCode};
@@ -18,8 +17,7 @@ SELECT
1817
json_extract(e.value, '$.data') as data,
1918
json_extract(e.value, '$.has_more') as has_more,
2019
json_extract(e.value, '$.after') as after,
21-
json_extract(e.value, '$.next_after') as next_after,
22-
json_extract(d.value, '$.priority') as priority
20+
json_extract(e.value, '$.next_after') as next_after
2321
FROM json_each(json_extract(?1, '$.buckets')) e
2422
LEFT OUTER JOIN json_each(json_extract(?1, '$.descriptions')) d
2523
ON json_extract(e.value, '$.bucket') == d.key",
@@ -32,15 +30,8 @@ FROM json_each(json_extract(?1, '$.buckets')) e
3230
// let _has_more = statement.column_int(2)? != 0;
3331
// let _after = statement.column_text(3)?;
3432
// let _next_after = statement.column_text(4)?;
35-
let priority = match statement.column_type(5)? {
36-
sqlite_nostd::ColumnType::Integer => {
37-
BucketPriority::try_from(statement.column_int(5)?).ok()
38-
}
39-
_ => None,
40-
}
41-
.unwrap_or_default();
4233

43-
insert_bucket_operations(db, bucket, data, priority)?;
34+
insert_bucket_operations(db, bucket, data)?;
4435
}
4536

4637
Ok(())
@@ -50,7 +41,6 @@ pub fn insert_bucket_operations(
5041
db: *mut sqlite::sqlite3,
5142
bucket: &str,
5243
data: &str,
53-
priority: BucketPriority,
5444
) -> Result<(), SQLiteError> {
5545
// Statement to insert new operations (only for PUT and REMOVE).
5646
// language=SQLite
@@ -72,14 +62,13 @@ FROM json_each(?) e",
7262
// We can consider splitting this into separate SELECT and INSERT statements.
7363
// language=SQLite
7464
let bucket_statement = db.prepare_v2(
75-
"INSERT INTO ps_buckets(name, priority)
76-
VALUES(?, ?)
65+
"INSERT INTO ps_buckets(name)
66+
VALUES(?)
7767
ON CONFLICT DO UPDATE
7868
SET last_applied_op = last_applied_op
7969
RETURNING id, last_applied_op",
8070
)?;
8171
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
82-
bucket_statement.bind_int(2, priority.into())?;
8372
bucket_statement.step()?;
8473

8574
let bucket_id = bucket_statement.column_int64(0)?;

crates/core/src/operations_vtab.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ extern "C" fn update(
8484
let result = insert_operation(db, args[3].text());
8585
vtab_result(vtab, result)
8686
} else if op == "sync_local" {
87-
let result = sync_local(db, &args[3]);
87+
let result = sync_local(db, args[3]);
8888
if let Ok(result_row) = result {
8989
unsafe {
9090
*p_row_id = result_row;

crates/core/src/sync_local.rs

+62-37
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use alloc::collections::BTreeSet;
22
use alloc::format;
33
use alloc::string::String;
4+
use alloc::vec::Vec;
5+
use serde::Deserialize;
46

57
use crate::bucket_priority::BucketPriority;
68
use crate::error::{PSResult, SQLiteError};
@@ -14,30 +16,25 @@ fn can_apply_sync_changes(
1416
db: *mut sqlite::sqlite3,
1517
priority: BucketPriority,
1618
) -> Result<bool, SQLiteError> {
17-
// We can only make sync changes visible if data is consistent, meaning that we've seen the
18-
// target operation sent in the original checkpoint message. We allow weakening consistency when
19-
// buckets from different priorities are involved (buckets with higher priorities or a lower
20-
// priority number can be published before we've reached the checkpoint for other buckets).
21-
// language=SQLite
22-
let statement = db.prepare_v2(
23-
"\
19+
// Don't publish downloaded data until the upload queue is empty (except for downloaded data in
20+
// priority 0, which is published earlier).
21+
if !priority.may_publish_with_outstanding_uploads() {
22+
// language=SQLite
23+
let statement = db.prepare_v2(
24+
"\
2425
SELECT group_concat(name)
2526
FROM ps_buckets
26-
WHERE (target_op > last_op) AND (priority <= ?)",
27-
)?;
28-
statement.bind_int(1, priority.into())?;
27+
WHERE target_op > last_op AND name = '$local'",
28+
)?;
2929

30-
if statement.step()? != ResultCode::ROW {
31-
return Err(SQLiteError::from(ResultCode::ABORT));
32-
}
30+
if statement.step()? != ResultCode::ROW {
31+
return Err(SQLiteError::from(ResultCode::ABORT));
32+
}
3333

34-
if statement.column_type(0)? == ColumnType::Text {
35-
return Ok(false);
36-
}
34+
if statement.column_type(0)? == ColumnType::Text {
35+
return Ok(false);
36+
}
3737

38-
// Don't publish downloaded data until the upload queue is empty (except for downloaded data in
39-
// priority 0, which is published earlier).
40-
if !priority.may_publish_with_outstanding_uploads() {
4138
let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?;
4239
if statement.step()? != ResultCode::DONE {
4340
return Ok(false);
@@ -47,13 +44,27 @@ WHERE (target_op > last_op) AND (priority <= ?)",
4744
Ok(true)
4845
}
4946

50-
pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, SQLiteError> {
51-
let priority = match data.value_type() {
52-
ColumnType::Integer => BucketPriority::try_from(data.int()),
53-
ColumnType::Float => BucketPriority::try_from(data.double() as i32),
54-
// Older clients without bucket priority support typically send an empty string here.
55-
_ => Ok(BucketPriority::LOWEST),
56-
}?;
47+
pub fn sync_local(db: *mut sqlite::sqlite3, data: *mut sqlite::value) -> Result<i64, SQLiteError> {
48+
#[derive(Deserialize)]
49+
struct SyncLocalArguments {
50+
#[serde(rename = "buckets")]
51+
_buckets: Vec<String>,
52+
priority: Option<BucketPriority>,
53+
}
54+
55+
const FALLBACK_PRIORITY: BucketPriority = BucketPriority::LOWEST;
56+
let (has_args, priority) = match data.value_type() {
57+
ColumnType::Text => {
58+
let text = data.text();
59+
if text.len() > 0 {
60+
let args: SyncLocalArguments = serde_json::from_str(text)?;
61+
(true, args.priority.unwrap_or(FALLBACK_PRIORITY))
62+
} else {
63+
(false, FALLBACK_PRIORITY)
64+
}
65+
}
66+
_ => (false, FALLBACK_PRIORITY),
67+
};
5768

5869
if !can_apply_sync_changes(db, priority)? {
5970
return Ok(0);
@@ -78,12 +89,17 @@ pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, S
7889
"\
7990
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
8091
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
81-
WITH updated_rows AS (
82-
SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
83-
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
84-
WHERE buckets.priority <= ?1
85-
UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
86-
)
92+
WITH
93+
involved_buckets (id) AS (
94+
SELECT id FROM ps_buckets WHERE ?1 IS NULL
95+
OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))
96+
),
97+
updated_rows AS (
98+
SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
99+
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
100+
WHERE buckets.id IN (SELECT id FROM involved_buckets)
101+
UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
102+
)
87103
88104
-- 3. Group the objects from different buckets together into a single one (ops).
89105
SELECT b.row_type as type,
@@ -98,15 +114,19 @@ FROM updated_rows b
98114
LEFT OUTER JOIN ps_oplog AS r
99115
ON r.row_type = b.row_type
100116
AND r.row_id = b.row_id
101-
AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1
117+
AND r.bucket IN (SELECT id FROM involved_buckets)
102118
-- Group for (3)
103119
GROUP BY b.row_type, b.row_id",
104120
)
105121
.into_db_result(db)?;
106122

107-
// TODO: cache statements
123+
if has_args {
124+
statement.bind_value(1, data)?;
125+
} else {
126+
statement.bind_null(1)?;
127+
}
108128

109-
statement.bind_int(1, priority.into())?;
129+
// TODO: cache statements
110130
while statement.step().into_db_result(db)? == ResultCode::ROW {
111131
let type_name = statement.column_text(0)?;
112132
let id = statement.column_text(1)?;
@@ -170,10 +190,15 @@ GROUP BY b.row_type, b.row_id",
170190
.prepare_v2(
171191
"UPDATE ps_buckets
172192
SET last_applied_op = last_op
173-
WHERE last_applied_op != last_op AND priority <= ?",
193+
WHERE last_applied_op != last_op AND
194+
(?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))))",
174195
)
175196
.into_db_result(db)?;
176-
updated.bind_int(1, priority.into())?;
197+
if has_args {
198+
updated.bind_value(1, data)?;
199+
} else {
200+
updated.bind_null(1)?;
201+
}
177202
updated.exec()?;
178203

179204
if priority == BucketPriority::LOWEST {

dart/test/sync_test.dart

+10-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,16 @@ void main() {
7575
],
7676
);
7777

78-
db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
79-
['sync_local', priority]);
78+
db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [
79+
'sync_local',
80+
jsonEncode({
81+
'priority': priority,
82+
'buckets': [
83+
for (final cs in checksums.cast<Map<String, dynamic>>())
84+
if (priority == null || cs['priority'] <= priority) cs['bucket']
85+
],
86+
})
87+
]);
8088
return db.lastInsertRowId == 1;
8189
}
8290

dart/test/utils/fix_035_fixtures.dart

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ const dataBroken = '''
1818

1919
/// Data after applying the migration fix, but before sync_local
2020
const dataMigrated = '''
21-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES
22-
(1, 'b1', 0, 0, 0, 0, 120, 0, 1),
23-
(2, 'b2', 0, 0, 0, 0, 3, 0, 1)
21+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
22+
(1, 'b1', 0, 0, 0, 0, 120, 0),
23+
(2, 'b2', 0, 0, 0, 0, 3, 0)
2424
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
2525
(1, 1, 'todos', 't1', '', '{}', 100),
2626
(1, 2, 'todos', 't2', '', '{}', 20),
@@ -39,9 +39,9 @@ const dataMigrated = '''
3939

4040
/// Data after applying the migration fix and sync_local
4141
const dataFixed = '''
42-
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, priority) VALUES
43-
(1, 'b1', 0, 0, 0, 0, 120, 0, 1),
44-
(2, 'b2', 0, 0, 0, 0, 3, 0, 1)
42+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
43+
(1, 'b1', 0, 0, 0, 0, 120, 0),
44+
(2, 'b2', 0, 0, 0, 0, 3, 0)
4545
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
4646
(1, 1, 'todos', 't1', '', '{}', 100),
4747
(1, 2, 'todos', 't2', '', '{}', 20),

0 commit comments

Comments
 (0)