Skip to content

Commit 82bca23

Browse files
committed
Add serde types for sync lines
1 parent 32384f7 commit 82bca23

File tree

6 files changed

+235
-25
lines changed

6 files changed

+235
-25
lines changed

crates/core/src/bucket_priority.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use sqlite_nostd::ResultCode;
44
use crate::error::SQLiteError;
55

66
#[repr(transparent)]
7-
#[derive(Clone, Copy, PartialEq, Eq)]
7+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
88
pub struct BucketPriority {
99
pub number: i32,
1010
}

crates/core/src/checkpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use sqlite_nostd::{Connection, Context, Value};
1313

1414
use crate::create_sqlite_text_fn;
1515
use crate::error::SQLiteError;
16-
use crate::sync_types::Checkpoint;
16+
use crate::sync::line::Checkpoint;
1717

1818
#[derive(Serialize, Deserialize)]
1919
struct CheckpointResult {

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ mod migrations;
2626
mod operations;
2727
mod operations_vtab;
2828
mod schema;
29+
mod sync;
2930
mod sync_local;
30-
mod sync_types;
3131
mod util;
3232
mod uuid;
3333
mod version;

crates/core/src/sync/line.rs

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
use alloc::string::String;
2+
use alloc::vec::Vec;
3+
use serde::Deserialize;
4+
5+
use crate::{
6+
bucket_priority::BucketPriority,
7+
util::{deserialize_optional_string_to_i64, deserialize_string_to_i64},
8+
};
9+
10+
#[derive(Deserialize, Debug)]
11+
12+
pub enum SyncLine {
13+
#[serde(rename = "checkpoint")]
14+
Checkpoint(Checkpoint),
15+
#[serde(rename = "checkpoint_diff")]
16+
CheckpointDiff(CheckpointDiff),
17+
18+
#[serde(rename = "checkpoint_complete")]
19+
CheckpointComplete(CheckpointComplete),
20+
#[serde(rename = "partial_checkpoint_complete")]
21+
CheckpointPartiallyComplete(CheckpointPartiallyComplete),
22+
23+
#[serde(rename = "data")]
24+
Data(DataLine),
25+
26+
#[serde(rename = "token_expires_in")]
27+
KeepAlive(TokenExpiresIn),
28+
}
29+
30+
#[derive(Deserialize, Debug)]
31+
pub struct Checkpoint {
32+
#[serde(deserialize_with = "deserialize_string_to_i64")]
33+
pub last_op_id: i64,
34+
#[serde(default)]
35+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
36+
pub write_checkpoint: Option<i64>,
37+
pub buckets: Vec<BucketChecksum>,
38+
}
39+
40+
#[derive(Deserialize, Debug)]
41+
pub struct CheckpointDiff {
42+
#[serde(deserialize_with = "deserialize_string_to_i64")]
43+
pub last_op_id: i64,
44+
pub updated_buckets: Vec<BucketChecksum>,
45+
pub removed_buckets: Vec<String>,
46+
pub write_checkpoint: Option<String>,
47+
}
48+
49+
#[derive(Deserialize, Debug)]
50+
pub struct CheckpointComplete {
51+
#[serde(deserialize_with = "deserialize_string_to_i64")]
52+
pub last_op_id: i64,
53+
}
54+
55+
#[derive(Deserialize, Debug)]
56+
pub struct CheckpointPartiallyComplete {
57+
#[serde(deserialize_with = "deserialize_string_to_i64")]
58+
pub last_op_id: i64,
59+
pub priority: BucketPriority,
60+
}
61+
62+
#[derive(Deserialize, Debug)]
63+
pub struct BucketChecksum {
64+
pub bucket: String,
65+
pub checksum: i32,
66+
pub priority: Option<BucketPriority>,
67+
pub count: Option<i64>,
68+
#[serde(default)]
69+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
70+
pub last_op_id: Option<i64>,
71+
}
72+
73+
#[derive(Deserialize, Debug)]
74+
pub struct DataLine {
75+
pub bucket: String,
76+
pub data: Vec<OplogEntry>,
77+
#[serde(default)]
78+
pub has_more: bool,
79+
#[serde(default)]
80+
pub after: Option<String>,
81+
#[serde(default)]
82+
pub next_after: Option<String>,
83+
}
84+
85+
#[derive(Deserialize, Debug)]
86+
pub struct OplogEntry {
87+
pub checksum: i32,
88+
#[serde(deserialize_with = "deserialize_string_to_i64")]
89+
pub op_id: i64,
90+
pub op: OpType,
91+
#[serde(default)]
92+
pub object_id: Option<String>,
93+
#[serde(default)]
94+
pub object_type: Option<String>,
95+
#[serde(default)]
96+
pub subkey: Option<String>,
97+
// TODO: BSON?
98+
#[serde(default)]
99+
pub data: Option<String>,
100+
}
101+
102+
#[derive(Deserialize, Debug, Clone, Copy)]
103+
pub enum OpType {
104+
CLEAR,
105+
MOVE,
106+
PUT,
107+
REMOVE,
108+
}
109+
110+
#[derive(Deserialize, Debug)]
111+
pub struct TokenExpiresIn(pub i32);
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use core::assert_matches::assert_matches;
116+
117+
use alloc::vec;
118+
119+
use super::*;
120+
121+
fn deserialize(source: &str) -> SyncLine {
122+
serde_json::from_str(source).expect("Should have deserialized")
123+
}
124+
125+
#[test]
126+
fn parse_token_expires_in() {
127+
assert_matches!(
128+
deserialize(r#"{"token_expires_in": 123}"#),
129+
SyncLine::KeepAlive(TokenExpiresIn(123))
130+
);
131+
}
132+
133+
#[test]
134+
fn parse_checkpoint() {
135+
assert_matches!(
136+
deserialize(r#"{"checkpoint": {"last_op_id": "10", "buckets": []}}"#),
137+
SyncLine::Checkpoint(Checkpoint {
138+
last_op_id: 10,
139+
write_checkpoint: None,
140+
buckets: _,
141+
})
142+
);
143+
144+
let SyncLine::Checkpoint(checkpoint) = deserialize(
145+
r#"{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}"#,
146+
) else {
147+
panic!("Expected checkpoint");
148+
};
149+
150+
assert_eq!(checkpoint.buckets.len(), 1);
151+
let bucket = &checkpoint.buckets[0];
152+
assert_eq!(bucket.bucket, "a");
153+
assert_eq!(bucket.checksum, 10);
154+
assert_eq!(bucket.priority, None);
155+
156+
let SyncLine::Checkpoint(checkpoint) = deserialize(
157+
r#"{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}"#,
158+
) else {
159+
panic!("Expected checkpoint");
160+
};
161+
162+
assert_eq!(checkpoint.buckets.len(), 1);
163+
let bucket = &checkpoint.buckets[0];
164+
assert_eq!(bucket.bucket, "a");
165+
assert_eq!(bucket.checksum, 10);
166+
assert_eq!(bucket.priority, Some(BucketPriority { number: 1 }));
167+
}
168+
169+
#[test]
170+
fn parse_checkpoint_diff() {
171+
let SyncLine::CheckpointDiff(diff) = deserialize(
172+
r#"{"checkpoint_diff": {"last_op_id": "10", "buckets": [], "updated_buckets": [], "removed_buckets": []}}"#,
173+
) else {
174+
panic!("Expected checkpoint diff")
175+
};
176+
177+
assert_eq!(diff.updated_buckets.len(), 0);
178+
assert_eq!(diff.removed_buckets.len(), 0);
179+
}
180+
181+
#[test]
182+
fn parse_checkpoint_complete() {
183+
assert_matches!(
184+
deserialize(r#"{"checkpoint_complete": {"last_op_id": "10"}}"#),
185+
SyncLine::CheckpointComplete(CheckpointComplete { last_op_id: 10 })
186+
);
187+
}
188+
189+
#[test]
190+
fn parse_checkpoint_partially_complete() {
191+
assert_matches!(
192+
deserialize(r#"{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}"#),
193+
SyncLine::CheckpointPartiallyComplete(CheckpointPartiallyComplete {
194+
last_op_id: 10,
195+
priority: BucketPriority { number: 1 }
196+
})
197+
);
198+
}
199+
200+
#[test]
201+
fn parse_data() {
202+
let SyncLine::Data(data) = deserialize(
203+
r#"{"data": {
204+
"bucket": "bkt",
205+
"data": [{"checksum":10,"op_id":"1","object_id":"test","object_type":"users","op":"PUT","subkey":null,"data":"{\"name\":\"user 0\",\"email\":\"[email protected]\"}"}],
206+
"after": null,
207+
"next_after": null}
208+
}"#,
209+
) else {
210+
panic!("Expected data line")
211+
};
212+
213+
assert_eq!(data.bucket, "bkt");
214+
assert_eq!(data.after, None);
215+
assert_eq!(data.next_after, None);
216+
217+
assert_eq!(data.data.len(), 1);
218+
assert_matches!(
219+
&data.data[0],
220+
OplogEntry {
221+
checksum: 10,
222+
op_id: 1,
223+
object_id: Some(_),
224+
object_type: Some(_),
225+
op: OpType::PUT,
226+
subkey: None,
227+
data,
228+
}
229+
);
230+
}
231+
}

crates/core/src/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod line;

crates/core/src/sync_types.rs

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)