Skip to content

Commit 1fb935e

Browse files
committed
Track partial checkpoints
1 parent a753713 commit 1fb935e

File tree

5 files changed

+116
-62
lines changed

5 files changed

+116
-62
lines changed

crates/core/src/bson/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
pub use de::Deserializer;
22
pub use error::BsonError;
3-
use parser::Parser;
43
use serde::Deserialize;
54
pub use writer::BsonWriter;
65

crates/core/src/sync/line.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ pub struct CheckpointDiff<'a> {
4848
pub updated_buckets: Vec<BucketChecksum<'a>>,
4949
#[serde(borrow)]
5050
pub removed_buckets: Vec<&'a str>,
51-
#[serde(borrow)]
52-
pub write_checkpoint: Option<&'a str>,
51+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
52+
pub write_checkpoint: Option<i64>,
5353
}
5454

5555
#[derive(Deserialize, Debug)]

crates/core/src/sync/storage_adapter.rs

+5-16
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use core::{assert_matches::debug_assert_matches, fmt::Display};
22

33
use alloc::{
4-
collections::btree_map::BTreeMap,
54
string::{String, ToString},
65
vec::Vec,
76
};
@@ -47,22 +46,13 @@ impl StorageAdapter {
4746
})
4847
}
4948

50-
pub fn collect_local_bucket_state(
51-
&self,
52-
) -> Result<
53-
(
54-
Vec<BucketRequest>,
55-
BTreeMap<String, Option<BucketDescription>>,
56-
),
57-
SQLiteError,
58-
> {
49+
pub fn collect_bucket_requests(&self) -> Result<Vec<BucketRequest>, SQLiteError> {
5950
// language=SQLite
6051
let statement = self.db.prepare_v2(
6152
"SELECT name, last_op FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'",
6253
)?;
6354

6455
let mut requests = Vec::<BucketRequest>::new();
65-
let mut local_state = BTreeMap::<String, Option<BucketDescription>>::new();
6656

6757
while statement.step()? == ResultCode::ROW {
6858
let bucket_name = statement.column_text(0)?.to_string();
@@ -72,10 +62,9 @@ impl StorageAdapter {
7262
name: bucket_name.clone(),
7363
after: last_op.to_string(),
7464
});
75-
local_state.insert(bucket_name, None);
7665
}
7766

78-
Ok((requests, local_state))
67+
Ok(requests)
7968
}
8069

8170
pub fn delete_buckets<'a>(
@@ -179,7 +168,7 @@ GROUP BY bucket_list.bucket",
179168
}
180169

181170
let mut buckets = Vec::<BucketInfo>::new();
182-
for bucket in &checkpoint.buckets {
171+
for bucket in checkpoint.buckets.values() {
183172
if bucket.is_in_priority(priority) {
184173
buckets.push(BucketInfo {
185174
bucket: &bucket.bucket,
@@ -228,7 +217,7 @@ GROUP BY bucket_list.bucket",
228217
.db
229218
.prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?")?;
230219

231-
for bucket in &checkpoint.buckets {
220+
for bucket in checkpoint.buckets.values() {
232221
if bucket.is_in_priority(priority) {
233222
update_bucket.bind_int64(1, checkpoint.last_op_id)?;
234223
update_bucket.bind_text(2, &bucket.bucket, sqlite::Destructor::STATIC)?;
@@ -255,7 +244,7 @@ GROUP BY bucket_list.bucket",
255244
priority,
256245
buckets: checkpoint
257246
.buckets
258-
.iter()
247+
.values()
259248
.filter_map(|item| {
260249
if item.is_in_priority(Some(priority)) {
261250
Some(item.bucket.as_str())

crates/core/src/sync/streaming_sync.rs

+107-41
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use sqlite_nostd::{self as sqlite, ResultCode};
2525
use super::{
2626
bucket_priority::BucketPriority,
2727
interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent},
28-
line::{BucketChecksum, Checkpoint, SyncLine},
28+
line::{BucketChecksum, Checkpoint, CheckpointDiff, SyncLine},
2929
operations::insert_bucket_operations,
3030
storage_adapter::{BucketDescription, StorageAdapter, SyncLocalResult},
3131
sync_status::{SyncDownloadProgress, SyncStatusContainer},
@@ -205,11 +205,10 @@ impl StreamingSyncIteration {
205205
}
206206

207207
async fn run(mut self) -> Result<(), SQLiteError> {
208-
let mut target = None::<OwnedCheckpoint>;
209208
let mut validated = None::<OwnedCheckpoint>;
210209
let mut applied = None::<OwnedCheckpoint>;
211210

212-
let mut bucket_map = self.prepare_request().await?;
211+
let mut target = SyncTarget::BeforeCheckpoint(self.prepare_request().await?);
213212

214213
loop {
215214
let event = Self::receive_event().await;
@@ -225,35 +224,40 @@ impl StreamingSyncIteration {
225224

226225
match line {
227226
SyncLine::Checkpoint(checkpoint) => {
228-
let new_target = OwnedCheckpoint::from(&checkpoint);
229-
230-
let mut to_delete: BTreeSet<&str> =
231-
bucket_map.keys().map(|s| s.as_str()).collect();
232-
let mut new_buckets = BTreeMap::<String, Option<BucketDescription>>::new();
233-
for bucket in &new_target.buckets {
234-
new_buckets.insert(
235-
bucket.bucket.clone(),
236-
Some(BucketDescription {
237-
priority: bucket.priority,
238-
name: bucket.bucket.clone(),
239-
}),
240-
);
241-
to_delete.remove(bucket.bucket.as_str());
242-
}
227+
let to_delete = target.track_checkpoint(&checkpoint);
243228

244-
self.adapter.delete_buckets(to_delete)?;
245-
let progress = self.load_progress(&new_target)?;
229+
self.adapter
230+
.delete_buckets(to_delete.iter().map(|b| b.as_str()))?;
231+
let progress = self.load_progress(target.target_checkpoint().unwrap())?;
232+
self.status.update(
233+
|s| s.start_tracking_checkpoint(progress),
234+
&mut event.instructions,
235+
);
236+
}
237+
SyncLine::CheckpointDiff(diff) => {
238+
let Some(target) = target.target_checkpoint_mut() else {
239+
event.instructions.push(Instruction::LogLine {
240+
severity: LogSeverity::WARNING,
241+
line: "Received checkpoint_diff without previous checkpoint"
242+
.to_string(),
243+
});
244+
break;
245+
};
246+
247+
target.apply_diff(&diff);
248+
self.adapter
249+
.delete_buckets(diff.removed_buckets.iter().copied())?;
250+
251+
let progress = self.load_progress(target)?;
246252
self.status.update(
247253
|s| s.start_tracking_checkpoint(progress),
248254
&mut event.instructions,
249255
);
250-
251-
bucket_map = new_buckets;
252-
target = Some(new_target);
253256
}
254-
SyncLine::CheckpointDiff(checkpoint_diff) => todo!(),
255257
SyncLine::CheckpointComplete(checkpoint_complete) => {
256-
let target = target.as_ref().expect("should have target checkpoint");
258+
let target = target
259+
.target_checkpoint()
260+
.expect("should have target checkpoint");
257261
let result = self.adapter.sync_local(target, None)?;
258262

259263
match result {
@@ -286,7 +290,9 @@ impl StreamingSyncIteration {
286290
}
287291
SyncLine::CheckpointPartiallyComplete(complete) => {
288292
let priority = complete.priority;
289-
let target = target.as_ref().expect("should have target checkpoint");
293+
let target = target
294+
.target_checkpoint()
295+
.expect("should have target checkpoint");
290296
let result = self.adapter.sync_local(target, Some(priority))?;
291297

292298
match result {
@@ -340,9 +346,7 @@ impl StreamingSyncIteration {
340346
)?)
341347
}
342348

343-
async fn prepare_request(
344-
&mut self,
345-
) -> Result<BTreeMap<String, Option<BucketDescription>>, SQLiteError> {
349+
async fn prepare_request(&mut self) -> Result<Vec<String>, SQLiteError> {
346350
let event = Self::receive_event().await;
347351
let SyncEvent::Initialize = event.event else {
348352
return Err(SQLiteError::from(ResultCode::MISUSE));
@@ -351,7 +355,8 @@ impl StreamingSyncIteration {
351355
self.status
352356
.update(|s| s.start_connecting(), &mut event.instructions);
353357

354-
let (requests, bucket_map) = self.adapter.collect_local_bucket_state()?;
358+
let requests = self.adapter.collect_bucket_requests()?;
359+
let local_bucket_names: Vec<String> = requests.iter().map(|s| s.after.clone()).collect();
355360
let request = StreamingSyncRequest {
356361
buckets: requests,
357362
include_checksum: true,
@@ -363,27 +368,81 @@ impl StreamingSyncIteration {
363368
event
364369
.instructions
365370
.push(Instruction::EstablishSyncStream { request });
366-
Ok(bucket_map)
371+
Ok(local_bucket_names)
372+
}
373+
}
374+
375+
enum SyncTarget {
376+
/// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is
377+
/// updated for subsequent checkpoint or checkpoint_diff lines.
378+
Tracking(OwnedCheckpoint),
379+
/// We have not received a checkpoint message yet. We still keep a list of local buckets around
380+
/// so that we know which ones to delete depending on the first checkpoint message.
381+
BeforeCheckpoint(Vec<String>),
382+
}
383+
384+
impl SyncTarget {
385+
fn target_checkpoint(&self) -> Option<&OwnedCheckpoint> {
386+
match self {
387+
Self::Tracking(cp) => Some(cp),
388+
_ => None,
389+
}
390+
}
391+
392+
fn target_checkpoint_mut(&mut self) -> Option<&mut OwnedCheckpoint> {
393+
match self {
394+
Self::Tracking(cp) => Some(cp),
395+
_ => None,
396+
}
397+
}
398+
399+
fn track_checkpoint<'a>(&mut self, checkpoint: &Checkpoint<'a>) -> BTreeSet<String> {
400+
let mut to_delete: BTreeSet<String> = match &self {
401+
SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(),
402+
SyncTarget::BeforeCheckpoint(buckets) => buckets.iter().cloned().collect(),
403+
};
404+
405+
let mut buckets = BTreeMap::<String, OwnedBucketChecksum>::new();
406+
for bucket in &checkpoint.buckets {
407+
buckets.insert(bucket.bucket.to_string(), OwnedBucketChecksum::from(bucket));
408+
to_delete.remove(bucket.bucket);
409+
}
410+
411+
*self = SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets));
412+
to_delete
367413
}
368414
}
369415

370416
pub struct OwnedCheckpoint {
371417
pub last_op_id: i64,
372418
pub write_checkpoint: Option<i64>,
373-
pub buckets: Vec<OwnedBucketChecksum>,
419+
pub buckets: BTreeMap<String, OwnedBucketChecksum>,
374420
}
375421

376-
impl From<&'_ Checkpoint<'_>> for OwnedCheckpoint {
377-
fn from(value: &'_ Checkpoint<'_>) -> Self {
422+
impl OwnedCheckpoint {
423+
fn from_checkpoint<'a>(
424+
checkpoint: &Checkpoint<'a>,
425+
buckets: BTreeMap<String, OwnedBucketChecksum>,
426+
) -> Self {
378427
Self {
379-
last_op_id: value.last_op_id,
380-
write_checkpoint: value.write_checkpoint,
381-
buckets: value
382-
.buckets
383-
.iter()
384-
.map(OwnedBucketChecksum::from)
385-
.collect(),
428+
last_op_id: checkpoint.last_op_id,
429+
write_checkpoint: checkpoint.write_checkpoint,
430+
buckets: buckets,
431+
}
432+
}
433+
434+
fn apply_diff<'a>(&mut self, diff: &CheckpointDiff<'a>) {
435+
for removed in &diff.removed_buckets {
436+
self.buckets.remove(*removed);
437+
}
438+
439+
for updated in &diff.updated_buckets {
440+
let owned = OwnedBucketChecksum::from(updated);
441+
self.buckets.insert(owned.bucket.clone(), owned);
386442
}
443+
444+
self.last_op_id = diff.last_op_id;
445+
self.write_checkpoint = diff.write_checkpoint;
387446
}
388447
}
389448

@@ -402,6 +461,13 @@ impl OwnedBucketChecksum {
402461
Some(prio) => self.priority >= prio,
403462
}
404463
}
464+
465+
fn description(&self) -> BucketDescription {
466+
BucketDescription {
467+
priority: self.priority,
468+
name: self.bucket.clone(),
469+
}
470+
}
405471
}
406472

407473
impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {

crates/core/src/sync/sync_status.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl DownloadSyncStatus {
7575

7676
let lowest_priority = applied
7777
.buckets
78-
.iter()
78+
.values()
7979
.map(|bkt| bkt.priority)
8080
.max()
8181
.unwrap_or(BucketPriority::SENTINEL);
@@ -167,7 +167,7 @@ impl SyncDownloadProgress {
167167
>,
168168
) -> Result<Self, ResultCode> {
169169
let mut buckets = BTreeMap::<String, BucketProgress>::new();
170-
for bucket in &checkpoint.buckets {
170+
for bucket in checkpoint.buckets.values() {
171171
buckets.insert(
172172
bucket.bucket.clone(),
173173
BucketProgress {

0 commit comments

Comments
 (0)