@@ -25,7 +25,7 @@ use sqlite_nostd::{self as sqlite, ResultCode};
25
25
use super :: {
26
26
bucket_priority:: BucketPriority ,
27
27
interface:: { Instruction , LogSeverity , StreamingSyncRequest , SyncControlRequest , SyncEvent } ,
28
- line:: { BucketChecksum , Checkpoint , SyncLine } ,
28
+ line:: { BucketChecksum , Checkpoint , CheckpointDiff , SyncLine } ,
29
29
operations:: insert_bucket_operations,
30
30
storage_adapter:: { BucketDescription , StorageAdapter , SyncLocalResult } ,
31
31
sync_status:: { SyncDownloadProgress , SyncStatusContainer } ,
@@ -205,11 +205,10 @@ impl StreamingSyncIteration {
205
205
}
206
206
207
207
async fn run ( mut self ) -> Result < ( ) , SQLiteError > {
208
- let mut target = None :: < OwnedCheckpoint > ;
209
208
let mut validated = None :: < OwnedCheckpoint > ;
210
209
let mut applied = None :: < OwnedCheckpoint > ;
211
210
212
- let mut bucket_map = self . prepare_request ( ) . await ?;
211
+ let mut target = SyncTarget :: BeforeCheckpoint ( self . prepare_request ( ) . await ?) ;
213
212
214
213
loop {
215
214
let event = Self :: receive_event ( ) . await ;
@@ -225,35 +224,40 @@ impl StreamingSyncIteration {
225
224
226
225
match line {
227
226
SyncLine :: Checkpoint ( checkpoint) => {
228
- let new_target = OwnedCheckpoint :: from ( & checkpoint) ;
229
-
230
- let mut to_delete: BTreeSet < & str > =
231
- bucket_map. keys ( ) . map ( |s| s. as_str ( ) ) . collect ( ) ;
232
- let mut new_buckets = BTreeMap :: < String , Option < BucketDescription > > :: new ( ) ;
233
- for bucket in & new_target. buckets {
234
- new_buckets. insert (
235
- bucket. bucket . clone ( ) ,
236
- Some ( BucketDescription {
237
- priority : bucket. priority ,
238
- name : bucket. bucket . clone ( ) ,
239
- } ) ,
240
- ) ;
241
- to_delete. remove ( bucket. bucket . as_str ( ) ) ;
242
- }
227
+ let to_delete = target. track_checkpoint ( & checkpoint) ;
243
228
244
- self . adapter . delete_buckets ( to_delete) ?;
245
- let progress = self . load_progress ( & new_target) ?;
229
+ self . adapter
230
+ . delete_buckets ( to_delete. iter ( ) . map ( |b| b. as_str ( ) ) ) ?;
231
+ let progress = self . load_progress ( target. target_checkpoint ( ) . unwrap ( ) ) ?;
232
+ self . status . update (
233
+ |s| s. start_tracking_checkpoint ( progress) ,
234
+ & mut event. instructions ,
235
+ ) ;
236
+ }
237
+ SyncLine :: CheckpointDiff ( diff) => {
238
+ let Some ( target) = target. target_checkpoint_mut ( ) else {
239
+ event. instructions . push ( Instruction :: LogLine {
240
+ severity : LogSeverity :: WARNING ,
241
+ line : "Received checkpoint_diff without previous checkpoint"
242
+ . to_string ( ) ,
243
+ } ) ;
244
+ break ;
245
+ } ;
246
+
247
+ target. apply_diff ( & diff) ;
248
+ self . adapter
249
+ . delete_buckets ( diff. removed_buckets . iter ( ) . copied ( ) ) ?;
250
+
251
+ let progress = self . load_progress ( target) ?;
246
252
self . status . update (
247
253
|s| s. start_tracking_checkpoint ( progress) ,
248
254
& mut event. instructions ,
249
255
) ;
250
-
251
- bucket_map = new_buckets;
252
- target = Some ( new_target) ;
253
256
}
254
- SyncLine :: CheckpointDiff ( checkpoint_diff) => todo ! ( ) ,
255
257
SyncLine :: CheckpointComplete ( checkpoint_complete) => {
256
- let target = target. as_ref ( ) . expect ( "should have target checkpoint" ) ;
258
+ let target = target
259
+ . target_checkpoint ( )
260
+ . expect ( "should have target checkpoint" ) ;
257
261
let result = self . adapter . sync_local ( target, None ) ?;
258
262
259
263
match result {
@@ -286,7 +290,9 @@ impl StreamingSyncIteration {
286
290
}
287
291
SyncLine :: CheckpointPartiallyComplete ( complete) => {
288
292
let priority = complete. priority ;
289
- let target = target. as_ref ( ) . expect ( "should have target checkpoint" ) ;
293
+ let target = target
294
+ . target_checkpoint ( )
295
+ . expect ( "should have target checkpoint" ) ;
290
296
let result = self . adapter . sync_local ( target, Some ( priority) ) ?;
291
297
292
298
match result {
@@ -340,9 +346,7 @@ impl StreamingSyncIteration {
340
346
) ?)
341
347
}
342
348
343
- async fn prepare_request (
344
- & mut self ,
345
- ) -> Result < BTreeMap < String , Option < BucketDescription > > , SQLiteError > {
349
+ async fn prepare_request ( & mut self ) -> Result < Vec < String > , SQLiteError > {
346
350
let event = Self :: receive_event ( ) . await ;
347
351
let SyncEvent :: Initialize = event. event else {
348
352
return Err ( SQLiteError :: from ( ResultCode :: MISUSE ) ) ;
@@ -351,7 +355,8 @@ impl StreamingSyncIteration {
351
355
self . status
352
356
. update ( |s| s. start_connecting ( ) , & mut event. instructions ) ;
353
357
354
- let ( requests, bucket_map) = self . adapter . collect_local_bucket_state ( ) ?;
358
+ let requests = self . adapter . collect_bucket_requests ( ) ?;
359
+ let local_bucket_names: Vec < String > = requests. iter ( ) . map ( |s| s. after . clone ( ) ) . collect ( ) ;
355
360
let request = StreamingSyncRequest {
356
361
buckets : requests,
357
362
include_checksum : true ,
@@ -363,27 +368,81 @@ impl StreamingSyncIteration {
363
368
event
364
369
. instructions
365
370
. push ( Instruction :: EstablishSyncStream { request } ) ;
366
- Ok ( bucket_map)
371
+ Ok ( local_bucket_names)
372
+ }
373
+ }
374
+
375
+ enum SyncTarget {
376
+ /// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is
377
+ /// updated for subsequent checkpoint or checkpoint_diff lines.
378
+ Tracking ( OwnedCheckpoint ) ,
379
+ /// We have not received a checkpoint message yet. We still keep a list of local buckets around
380
+ /// so that we know which ones to delete depending on the first checkpoint message.
381
+ BeforeCheckpoint ( Vec < String > ) ,
382
+ }
383
+
384
+ impl SyncTarget {
385
+ fn target_checkpoint ( & self ) -> Option < & OwnedCheckpoint > {
386
+ match self {
387
+ Self :: Tracking ( cp) => Some ( cp) ,
388
+ _ => None ,
389
+ }
390
+ }
391
+
392
+ fn target_checkpoint_mut ( & mut self ) -> Option < & mut OwnedCheckpoint > {
393
+ match self {
394
+ Self :: Tracking ( cp) => Some ( cp) ,
395
+ _ => None ,
396
+ }
397
+ }
398
+
399
+ fn track_checkpoint < ' a > ( & mut self , checkpoint : & Checkpoint < ' a > ) -> BTreeSet < String > {
400
+ let mut to_delete: BTreeSet < String > = match & self {
401
+ SyncTarget :: Tracking ( checkpoint) => checkpoint. buckets . keys ( ) . cloned ( ) . collect ( ) ,
402
+ SyncTarget :: BeforeCheckpoint ( buckets) => buckets. iter ( ) . cloned ( ) . collect ( ) ,
403
+ } ;
404
+
405
+ let mut buckets = BTreeMap :: < String , OwnedBucketChecksum > :: new ( ) ;
406
+ for bucket in & checkpoint. buckets {
407
+ buckets. insert ( bucket. bucket . to_string ( ) , OwnedBucketChecksum :: from ( bucket) ) ;
408
+ to_delete. remove ( bucket. bucket ) ;
409
+ }
410
+
411
+ * self = SyncTarget :: Tracking ( OwnedCheckpoint :: from_checkpoint ( checkpoint, buckets) ) ;
412
+ to_delete
367
413
}
368
414
}
369
415
370
416
pub struct OwnedCheckpoint {
371
417
pub last_op_id : i64 ,
372
418
pub write_checkpoint : Option < i64 > ,
373
- pub buckets : Vec < OwnedBucketChecksum > ,
419
+ pub buckets : BTreeMap < String , OwnedBucketChecksum > ,
374
420
}
375
421
376
- impl From < & ' _ Checkpoint < ' _ > > for OwnedCheckpoint {
377
- fn from ( value : & ' _ Checkpoint < ' _ > ) -> Self {
422
+ impl OwnedCheckpoint {
423
+ fn from_checkpoint < ' a > (
424
+ checkpoint : & Checkpoint < ' a > ,
425
+ buckets : BTreeMap < String , OwnedBucketChecksum > ,
426
+ ) -> Self {
378
427
Self {
379
- last_op_id : value. last_op_id ,
380
- write_checkpoint : value. write_checkpoint ,
381
- buckets : value
382
- . buckets
383
- . iter ( )
384
- . map ( OwnedBucketChecksum :: from)
385
- . collect ( ) ,
428
+ last_op_id : checkpoint. last_op_id ,
429
+ write_checkpoint : checkpoint. write_checkpoint ,
430
+ buckets : buckets,
431
+ }
432
+ }
433
+
434
+ fn apply_diff < ' a > ( & mut self , diff : & CheckpointDiff < ' a > ) {
435
+ for removed in & diff. removed_buckets {
436
+ self . buckets . remove ( * removed) ;
437
+ }
438
+
439
+ for updated in & diff. updated_buckets {
440
+ let owned = OwnedBucketChecksum :: from ( updated) ;
441
+ self . buckets . insert ( owned. bucket . clone ( ) , owned) ;
386
442
}
443
+
444
+ self . last_op_id = diff. last_op_id ;
445
+ self . write_checkpoint = diff. write_checkpoint ;
387
446
}
388
447
}
389
448
@@ -402,6 +461,13 @@ impl OwnedBucketChecksum {
402
461
Some ( prio) => self . priority >= prio,
403
462
}
404
463
}
464
+
465
+ fn description ( & self ) -> BucketDescription {
466
+ BucketDescription {
467
+ priority : self . priority ,
468
+ name : self . bucket . clone ( ) ,
469
+ }
470
+ }
405
471
}
406
472
407
473
impl From < & ' _ BucketChecksum < ' _ > > for OwnedBucketChecksum {
0 commit comments