Skip to content

Commit 03db494

Browse files
committed
Move checksum validation to new types
1 parent ce130cc commit 03db494

File tree

7 files changed

+213
-69
lines changed

7 files changed

+213
-69
lines changed

crates/core/src/checkpoint.rs

Lines changed: 16 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ use sqlite_nostd::{Connection, Context, Value};
1313

1414
use crate::create_sqlite_text_fn;
1515
use crate::error::SQLiteError;
16-
use crate::sync_types::Checkpoint;
16+
use crate::sync::checkpoint::{validate_checkpoint, OwnedBucketChecksum};
17+
use crate::sync::line::Checkpoint;
1718

18-
#[derive(Serialize, Deserialize)]
19+
#[derive(Serialize)]
1920
struct CheckpointResult {
2021
valid: bool,
2122
failed_buckets: Vec<String>,
@@ -26,53 +27,23 @@ fn powersync_validate_checkpoint_impl(
2627
args: &[*mut sqlite::value],
2728
) -> Result<String, SQLiteError> {
2829
let data = args[0].text();
29-
30-
let _checkpoint: Checkpoint = serde_json::from_str(data)?;
31-
30+
let checkpoint: Checkpoint = serde_json::from_str(data)?;
3231
let db = ctx.db_handle();
33-
34-
// language=SQLite
35-
let statement = db.prepare_v2(
36-
"WITH
37-
bucket_list(bucket, checksum) AS (
38-
SELECT
39-
json_extract(json_each.value, '$.bucket') as bucket,
40-
json_extract(json_each.value, '$.checksum') as checksum
41-
FROM json_each(json_extract(?1, '$.buckets'))
42-
)
43-
SELECT
44-
bucket_list.bucket as bucket,
45-
IFNULL(buckets.add_checksum, 0) as add_checksum,
46-
IFNULL(buckets.op_checksum, 0) as oplog_checksum,
47-
bucket_list.checksum as expected_checksum
48-
FROM bucket_list
49-
LEFT OUTER JOIN ps_buckets AS buckets ON
50-
buckets.name = bucket_list.bucket
51-
GROUP BY bucket_list.bucket",
52-
)?;
53-
54-
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
55-
56-
let mut failures: Vec<String> = alloc::vec![];
57-
58-
while statement.step()? == ResultCode::ROW {
59-
let name = statement.column_text(0)?;
60-
// checksums with column_int are wrapped to i32 by SQLite
61-
let add_checksum = statement.column_int(1);
62-
let oplog_checksum = statement.column_int(2);
63-
let expected_checksum = statement.column_int(3);
64-
65-
// wrapping add is like +, but safely overflows
66-
let checksum = oplog_checksum.wrapping_add(add_checksum);
67-
68-
if checksum != expected_checksum {
69-
failures.push(String::from(name));
70-
}
32+
let buckets: Vec<OwnedBucketChecksum> = checkpoint
33+
.buckets
34+
.iter()
35+
.map(OwnedBucketChecksum::from)
36+
.collect();
37+
38+
let failures = validate_checkpoint(buckets.iter(), None, db)?;
39+
let mut failed_buckets = Vec::<String>::with_capacity(failures.len());
40+
for failure in failures {
41+
failed_buckets.push(failure.bucket_name);
7142
}
7243

7344
let result = CheckpointResult {
74-
valid: failures.is_empty(),
75-
failed_buckets: failures,
45+
valid: failed_buckets.is_empty(),
46+
failed_buckets: failed_buckets,
7647
};
7748

7849
Ok(json::to_string(&result)?)

crates/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ mod operations_vtab;
2727
mod schema;
2828
mod sync;
2929
mod sync_local;
30-
mod sync_types;
3130
mod util;
3231
mod uuid;
3332
mod version;

crates/core/src/sync/bucket_priority.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use sqlite_nostd::ResultCode;
44
use crate::error::SQLiteError;
55

66
#[repr(transparent)]
7-
#[derive(Clone, Copy, PartialEq, Eq)]
7+
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
88
pub struct BucketPriority {
99
pub number: i32,
1010
}
@@ -14,6 +14,8 @@ impl BucketPriority {
1414
self == BucketPriority::HIGHEST
1515
}
1616

17+
/// The priority to use when the sync service doesn't attach priorities in checkpoints.
18+
pub const FALLBACK: BucketPriority = BucketPriority { number: 3 };
1719
pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };
1820

1921
/// A low priority used to represent fully-completed sync operations across all priorities.

crates/core/src/sync/checkpoint.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use alloc::{string::String, vec::Vec};
2+
use num_traits::Zero;
3+
use serde::Deserialize;
4+
5+
use crate::{
6+
error::SQLiteError,
7+
sync::{
8+
line::{BucketChecksum, Checkpoint},
9+
BucketPriority, Checksum,
10+
},
11+
};
12+
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
13+
14+
#[derive(Debug, Clone)]
15+
pub struct OwnedBucketChecksum {
16+
pub bucket: String,
17+
pub checksum: Checksum,
18+
pub priority: BucketPriority,
19+
pub count: Option<i64>,
20+
}
21+
22+
impl OwnedBucketChecksum {
23+
pub fn is_in_priority(&self, prio: Option<BucketPriority>) -> bool {
24+
match prio {
25+
None => true,
26+
Some(prio) => self.priority >= prio,
27+
}
28+
}
29+
}
30+
31+
impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {
32+
fn from(value: &'_ BucketChecksum<'_>) -> Self {
33+
Self {
34+
bucket: value.bucket.clone().into_owned(),
35+
checksum: value.checksum,
36+
priority: value.priority.unwrap_or(BucketPriority::FALLBACK),
37+
count: value.count,
38+
}
39+
}
40+
}
41+
42+
pub struct ChecksumMismatch {
43+
pub bucket_name: String,
44+
pub expected_checksum: Checksum,
45+
pub actual_op_checksum: Checksum,
46+
pub actual_add_checksum: Checksum,
47+
}
48+
49+
pub fn validate_checkpoint<'a>(
50+
buckets: impl Iterator<Item = &'a OwnedBucketChecksum>,
51+
priority: Option<BucketPriority>,
52+
db: *mut sqlite::sqlite3,
53+
) -> Result<Vec<ChecksumMismatch>, SQLiteError> {
54+
// language=SQLite
55+
let statement = db.prepare_v2(
56+
"
57+
SELECT
58+
ps_buckets.add_checksum as add_checksum,
59+
ps_buckets.op_checksum as oplog_checksum
60+
FROM ps_buckets WHERE name = ?;",
61+
)?;
62+
63+
let mut failures: Vec<ChecksumMismatch> = Vec::new();
64+
for bucket in buckets {
65+
if bucket.is_in_priority(priority) {
66+
statement.bind_text(1, &bucket.bucket, sqlite_nostd::Destructor::STATIC)?;
67+
68+
let (add_checksum, oplog_checksum) = match statement.step()? {
69+
ResultCode::ROW => {
70+
let add_checksum = Checksum::from_i32(statement.column_int(0));
71+
let oplog_checksum = Checksum::from_i32(statement.column_int(1));
72+
(add_checksum, oplog_checksum)
73+
}
74+
_ => (Checksum::zero(), Checksum::zero()),
75+
};
76+
77+
let actual = add_checksum + oplog_checksum;
78+
79+
if actual != bucket.checksum {
80+
failures.push(ChecksumMismatch {
81+
bucket_name: bucket.bucket.clone(),
82+
expected_checksum: bucket.checksum,
83+
actual_add_checksum: add_checksum,
84+
actual_op_checksum: oplog_checksum,
85+
});
86+
}
87+
88+
statement.reset()?;
89+
}
90+
}
91+
92+
Ok(failures)
93+
}

crates/core/src/sync/line.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use alloc::borrow::Cow;
2+
use alloc::string::String;
3+
use alloc::string::ToString;
4+
use alloc::vec::Vec;
5+
use serde::Deserialize;
6+
7+
use super::BucketPriority;
8+
use super::Checksum;
9+
10+
use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64};
11+
12+
/// While we would like to always borrow strings for efficiency, that's not consistently possible.
13+
/// With the JSON decoder, borrowing from input data is only possible when the string contains no
14+
/// escape sequences (otherwise, the string is not a direct view of input data and we need an
15+
/// internal copy).
16+
type SyncLineStr<'a> = Cow<'a, str>;
17+
18+
#[derive(Deserialize, Debug)]
19+
pub struct Checkpoint<'a> {
20+
#[serde(deserialize_with = "deserialize_string_to_i64")]
21+
pub last_op_id: i64,
22+
#[serde(default)]
23+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
24+
pub write_checkpoint: Option<i64>,
25+
#[serde(borrow)]
26+
pub buckets: Vec<BucketChecksum<'a>>,
27+
}
28+
29+
#[derive(Deserialize, Debug)]
30+
pub struct BucketChecksum<'a> {
31+
#[serde(borrow)]
32+
pub bucket: SyncLineStr<'a>,
33+
pub checksum: Checksum,
34+
#[serde(default)]
35+
pub priority: Option<BucketPriority>,
36+
#[serde(default)]
37+
pub count: Option<i64>,
38+
// #[serde(default)]
39+
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
40+
// pub last_op_id: Option<i64>,
41+
}
42+
43+
#[derive(Deserialize, Debug)]
44+
pub struct DataLine<'a> {
45+
#[serde(borrow)]
46+
pub bucket: SyncLineStr<'a>,
47+
pub data: Vec<OplogEntry<'a>>,
48+
// #[serde(default)]
49+
// pub has_more: bool,
50+
// #[serde(default, borrow)]
51+
// pub after: Option<SyncLineStr<'a>>,
52+
// #[serde(default, borrow)]
53+
// pub next_after: Option<SyncLineStr<'a>>,
54+
}
55+
56+
#[derive(Deserialize, Debug)]
57+
pub struct OplogEntry<'a> {
58+
pub checksum: Checksum,
59+
#[serde(deserialize_with = "deserialize_string_to_i64")]
60+
pub op_id: i64,
61+
pub op: OpType,
62+
#[serde(default, borrow)]
63+
pub object_id: Option<SyncLineStr<'a>>,
64+
#[serde(default, borrow)]
65+
pub object_type: Option<SyncLineStr<'a>>,
66+
#[serde(default, borrow)]
67+
pub subkey: Option<SyncLineStr<'a>>,
68+
#[serde(default, borrow)]
69+
pub data: Option<OplogData<'a>>,
70+
}
71+
72+
#[derive(Debug)]
73+
pub enum OplogData<'a> {
74+
/// A string encoding a well-formed JSON object representing values of the row.
75+
Json { data: Cow<'a, str> },
76+
// BsonDocument { data: Cow<'a, [u8]> },
77+
}
78+
79+
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
80+
pub enum OpType {
81+
CLEAR,
82+
MOVE,
83+
PUT,
84+
REMOVE,
85+
}
86+
87+
impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> {
88+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
89+
where
90+
D: serde::Deserializer<'de>,
91+
{
92+
// For now, we will always get oplog data as a string. In the future, there may be the
93+
// option of the sync service sending BSON-encoded data lines too, but that's not relevant
94+
// for now.
95+
return Ok(OplogData::Json {
96+
data: Deserialize::deserialize(deserializer)?,
97+
});
98+
}
99+
}

crates/core/src/sync/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod bucket_priority;
2+
pub mod checkpoint;
23
mod checksum;
4+
pub mod line;
35

46
pub use bucket_priority::BucketPriority;
57
pub use checksum::Checksum;

crates/core/src/sync_types.rs

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)