Skip to content

Commit eec4dbb

Browse files
committed
Add checksum newtype
1 parent a21580a commit eec4dbb

File tree

8 files changed

+216
-19
lines changed

8 files changed

+216
-19
lines changed

crates/core/src/kv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ use sqlite::ResultCode;
88
use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

11-
use crate::bucket_priority::BucketPriority;
1211
use crate::create_sqlite_optional_text_fn;
1312
use crate::create_sqlite_text_fn;
1413
use crate::error::SQLiteError;
14+
use crate::sync::BucketPriority;
1515

1616
fn powersync_client_id_impl(
1717
ctx: *mut sqlite::context,

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use core::ffi::{c_char, c_int};
1212
use sqlite::ResultCode;
1313
use sqlite_nostd as sqlite;
1414

15-
mod bucket_priority;
1615
mod checkpoint;
1716
mod crud_vtab;
1817
mod diff;
@@ -26,6 +25,7 @@ mod migrations;
2625
mod operations;
2726
mod operations_vtab;
2827
mod schema;
28+
mod sync;
2929
mod sync_local;
3030
mod sync_types;
3131
mod util;

crates/core/src/migrations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use sqlite::ResultCode;
88
use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

11-
use crate::bucket_priority::BucketPriority;
1211
use crate::error::{PSResult, SQLiteError};
1312
use crate::fix_data::apply_v035_fix;
13+
use crate::sync::BucketPriority;
1414

1515
pub const LATEST_VERSION: i32 = 9;
1616

crates/core/src/operations.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use alloc::format;
22
use alloc::string::String;
3+
use num_traits::Zero;
34

45
use crate::error::{PSResult, SQLiteError};
6+
use crate::sync::Checksum;
57
use sqlite_nostd as sqlite;
68
use sqlite_nostd::{Connection, ResultCode};
79

@@ -101,16 +103,16 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
101103
bucket_statement.reset()?;
102104

103105
let mut last_op: Option<i64> = None;
104-
let mut add_checksum: i32 = 0;
105-
let mut op_checksum: i32 = 0;
106+
let mut add_checksum = Checksum::zero();
107+
let mut op_checksum = Checksum::zero();
106108
let mut added_ops: i32 = 0;
107109

108110
while iterate_statement.step()? == ResultCode::ROW {
109111
let op_id = iterate_statement.column_int64(0);
110112
let op = iterate_statement.column_text(1)?;
111113
let object_type = iterate_statement.column_text(2);
112114
let object_id = iterate_statement.column_text(3);
113-
let checksum = iterate_statement.column_int(4);
115+
let checksum = Checksum::from_i32(iterate_statement.column_int(4));
114116
let op_data = iterate_statement.column_text(5);
115117

116118
last_op = Some(op_id);
@@ -131,9 +133,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
131133

132134
while supersede_statement.step()? == ResultCode::ROW {
133135
// Superseded (deleted) a previous operation, add the checksum
134-
let supersede_checksum = supersede_statement.column_int(1);
135-
add_checksum = add_checksum.wrapping_add(supersede_checksum);
136-
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
136+
let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1));
137+
add_checksum += supersede_checksum;
138+
op_checksum -= supersede_checksum;
137139

138140
// Superseded an operation, only skip if the bucket was empty
139141
// Previously this checked "superseded_op <= last_applied_op".
@@ -149,7 +151,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
149151
if op == "REMOVE" {
150152
let should_skip_remove = !superseded;
151153

152-
add_checksum = add_checksum.wrapping_add(checksum);
154+
add_checksum += checksum;
153155

154156
if !should_skip_remove {
155157
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
@@ -190,12 +192,12 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
190192
insert_statement.bind_null(6)?;
191193
}
192194

193-
insert_statement.bind_int(7, checksum)?;
195+
insert_statement.bind_int(7, checksum.bitcast_i32())?;
194196
insert_statement.exec()?;
195197

196-
op_checksum = op_checksum.wrapping_add(checksum);
198+
op_checksum += checksum;
197199
} else if op == "MOVE" {
198-
add_checksum = add_checksum.wrapping_add(checksum);
200+
add_checksum += checksum;
199201
} else if op == "CLEAR" {
200202
// Any remaining PUT operations should get an implicit REMOVE
201203
// language=SQLite
@@ -223,12 +225,12 @@ WHERE bucket = ?1",
223225
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
224226
)?;
225227
clear_statement2.bind_int64(2, bucket_id)?;
226-
clear_statement2.bind_int(1, checksum)?;
228+
clear_statement2.bind_int(1, checksum.bitcast_i32())?;
227229
clear_statement2.exec()?;
228230

229-
add_checksum = 0;
231+
add_checksum = Checksum::zero();
230232
is_empty = true;
231-
op_checksum = 0;
233+
op_checksum = Checksum::zero();
232234
}
233235
}
234236

@@ -244,8 +246,8 @@ WHERE bucket = ?1",
244246
)?;
245247
statement.bind_int64(1, bucket_id)?;
246248
statement.bind_int64(2, *last_op)?;
247-
statement.bind_int(3, add_checksum)?;
248-
statement.bind_int(4, op_checksum)?;
249+
statement.bind_int(3, add_checksum.bitcast_i32())?;
250+
statement.bind_int(4, op_checksum.bitcast_i32())?;
249251
statement.bind_int(5, added_ops)?;
250252

251253
statement.exec()?;

crates/core/src/sync/checksum.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use core::{
2+
fmt::Display,
3+
num::Wrapping,
4+
ops::{Add, AddAssign, Sub, SubAssign},
5+
};
6+
7+
use num_traits::float::FloatCore;
8+
use num_traits::Zero;
9+
use serde::{de::Visitor, Deserialize, Serialize};
10+
11+
/// A checksum as received from the sync service.
12+
///
13+
/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums
14+
/// should be a wrapping add.
15+
#[repr(transparent)]
16+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
17+
pub struct Checksum(Wrapping<u32>);
18+
19+
impl Checksum {
20+
pub const fn value(self) -> u32 {
21+
self.0 .0
22+
}
23+
24+
pub const fn from_value(value: u32) -> Self {
25+
Self(Wrapping(value))
26+
}
27+
28+
pub const fn from_i32(value: i32) -> Self {
29+
Self::from_value(value as u32)
30+
}
31+
32+
pub const fn bitcast_i32(self) -> i32 {
33+
self.value() as i32
34+
}
35+
}
36+
37+
impl Zero for Checksum {
38+
fn zero() -> Self {
39+
const { Self::from_value(0) }
40+
}
41+
42+
fn is_zero(&self) -> bool {
43+
self.value() == 0
44+
}
45+
}
46+
47+
impl Add for Checksum {
48+
type Output = Self;
49+
50+
#[inline]
51+
fn add(self, rhs: Self) -> Self::Output {
52+
Self(self.0 + rhs.0)
53+
}
54+
}
55+
56+
impl AddAssign for Checksum {
57+
#[inline]
58+
fn add_assign(&mut self, rhs: Self) {
59+
self.0 += rhs.0
60+
}
61+
}
62+
63+
impl Sub for Checksum {
64+
type Output = Self;
65+
66+
fn sub(self, rhs: Self) -> Self::Output {
67+
Self(self.0 - rhs.0)
68+
}
69+
}
70+
71+
impl SubAssign for Checksum {
72+
fn sub_assign(&mut self, rhs: Self) {
73+
self.0 -= rhs.0;
74+
}
75+
}
76+
77+
impl From<u32> for Checksum {
78+
fn from(value: u32) -> Self {
79+
Self::from_value(value)
80+
}
81+
}
82+
83+
impl Display for Checksum {
84+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
85+
write!(f, "{:#010x}", self.value())
86+
}
87+
}
88+
89+
impl<'de> Deserialize<'de> for Checksum {
90+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
91+
where
92+
D: serde::Deserializer<'de>,
93+
{
94+
struct MyVisitor;
95+
96+
impl<'de> Visitor<'de> for MyVisitor {
97+
type Value = Checksum;
98+
99+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
100+
write!(formatter, "a number to interpret as a checksum")
101+
}
102+
103+
fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
104+
where
105+
E: serde::de::Error,
106+
{
107+
Ok(v.into())
108+
}
109+
110+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
111+
where
112+
E: serde::de::Error,
113+
{
114+
let as_u32: u32 = v.try_into().map_err(|_| {
115+
E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int")
116+
})?;
117+
Ok(as_u32.into())
118+
}
119+
120+
fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
121+
where
122+
E: serde::de::Error,
123+
{
124+
Ok(Checksum::from_i32(v))
125+
}
126+
127+
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
128+
where
129+
E: serde::de::Error,
130+
{
131+
// This is supposed to be an u32, but it could also be a i32 that we need to
132+
// normalize.
133+
let min: i64 = u32::MIN.into();
134+
let max: i64 = u32::MAX.into();
135+
136+
if v >= min && v <= max {
137+
return Ok(Checksum::from(v as u32));
138+
}
139+
140+
let as_i32: i32 = v.try_into().map_err(|_| {
141+
E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int")
142+
})?;
143+
Ok(Checksum::from_i32(as_i32))
144+
}
145+
146+
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
147+
where
148+
E: serde::de::Error,
149+
{
150+
if !v.is_finite() || f64::trunc(v) != v {
151+
return Err(E::invalid_value(
152+
serde::de::Unexpected::Float(v),
153+
&"a whole number",
154+
));
155+
}
156+
157+
self.visit_i64(v as i64)
158+
}
159+
}
160+
161+
deserializer.deserialize_u32(MyVisitor)
162+
}
163+
}
164+
165+
#[cfg(test)]
166+
mod test {
167+
use super::Checksum;
168+
169+
#[test]
170+
pub fn test_binary_representation() {
171+
assert_eq!(Checksum::from_i32(-1).value(), u32::MAX);
172+
assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX);
173+
assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1);
174+
}
175+
176+
fn deserialize(from: &str) -> Checksum {
177+
serde_json::from_str(from).expect("should deserialize")
178+
}
179+
180+
#[test]
181+
pub fn test_deserialize() {
182+
assert_eq!(deserialize("0").value(), 0);
183+
assert_eq!(deserialize("-1").value(), u32::MAX);
184+
assert_eq!(deserialize("-1.0").value(), u32::MAX);
185+
186+
assert_eq!(deserialize("3573495687").value(), 3573495687);
187+
assert_eq!(deserialize("3573495687.0").value(), 3573495687);
188+
assert_eq!(deserialize("-721471609.0").value(), 3573495687);
189+
}
190+
}

crates/core/src/sync/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod bucket_priority;
2+
mod checksum;
3+
4+
pub use bucket_priority::BucketPriority;
5+
pub use checksum::Checksum;

crates/core/src/sync_local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use alloc::string::String;
44
use alloc::vec::Vec;
55
use serde::Deserialize;
66

7-
use crate::bucket_priority::BucketPriority;
87
use crate::error::{PSResult, SQLiteError};
8+
use crate::sync::BucketPriority;
99
use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value};
1010
use sqlite_nostd::{ColumnType, Connection, ResultCode};
1111

0 commit comments

Comments
 (0)