Skip to content

Commit 2edd4c3

Browse files
committed
Refactor state machine
1 parent ad1242b commit 2edd4c3

File tree

7 files changed

+330
-101
lines changed

7 files changed

+330
-101
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 155 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ use crate::{
2020
error::{PowerSyncError, PowerSyncErrorCause},
2121
kv::client_id,
2222
state::DatabaseState,
23-
sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream},
23+
sync::{
24+
checkpoint::OwnedBucketChecksum, interface::StartSyncStream, line::DataLine,
25+
sync_status::Timestamp, BucketPriority,
26+
},
2427
};
25-
use sqlite_nostd::{self as sqlite, ResultCode};
28+
use sqlite_nostd::{self as sqlite};
2629

2730
use super::{
2831
interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent},
@@ -245,50 +248,50 @@ impl StreamingSyncIteration {
245248
Wait { a: PhantomData }
246249
}
247250

248-
/// Handles a single sync line.
251+
/// Starts handling a single sync line without altering any in-memory state of the state
252+
/// machine.
249253
///
250-
/// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of
251-
/// error determines whether the iteration can continue.
252-
fn handle_line(
253-
&mut self,
254-
target: &mut SyncTarget,
254+
/// After this call succeeds, the returned value can be used to update the state. For a
255+
/// discussion on why this split is necessary, see [SyncStateMachineTransition].
256+
fn prepare_handling_sync_line<'a>(
257+
&self,
258+
target: &SyncTarget,
255259
event: &mut ActiveEvent,
256-
line: &SyncLine,
257-
) -> Result<bool, PowerSyncError> {
258-
match line {
260+
line: &'a SyncLine<'a>,
261+
) -> Result<SyncStateMachineTransition<'a>, PowerSyncError> {
262+
Ok(match line {
259263
SyncLine::Checkpoint(checkpoint) => {
260-
self.validated_but_not_applied = None;
261-
let to_delete = target.track_checkpoint(&checkpoint);
264+
let (to_delete, updated_target) = target.track_checkpoint(&checkpoint);
262265

263266
self.adapter
264267
.delete_buckets(to_delete.iter().map(|b| b.as_str()))?;
265-
let progress = self.load_progress(target.target_checkpoint().unwrap())?;
266-
self.status.update(
267-
|s| s.start_tracking_checkpoint(progress),
268-
&mut event.instructions,
269-
);
268+
let progress = self.load_progress(updated_target.target_checkpoint().unwrap())?;
269+
SyncStateMachineTransition::StartTrackingCheckpoint {
270+
progress,
271+
updated_target,
272+
}
270273
}
271274
SyncLine::CheckpointDiff(diff) => {
272-
let Some(target) = target.target_checkpoint_mut() else {
275+
let Some(target) = target.target_checkpoint() else {
273276
return Err(PowerSyncError::sync_protocol_error(
274277
"Received checkpoint_diff without previous checkpoint",
275278
PowerSyncErrorCause::Unknown,
276279
));
277280
};
278281

282+
let mut target = target.clone();
279283
target.apply_diff(&diff);
280-
self.validated_but_not_applied = None;
281284
self.adapter
282285
.delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?;
283286

284-
let progress = self.load_progress(target)?;
285-
self.status.update(
286-
|s| s.start_tracking_checkpoint(progress),
287-
&mut event.instructions,
288-
);
287+
let progress = self.load_progress(&target)?;
288+
SyncStateMachineTransition::StartTrackingCheckpoint {
289+
progress,
290+
updated_target: SyncTarget::Tracking(target),
291+
}
289292
}
290293
SyncLine::CheckpointComplete(_) => {
291-
let Some(target) = target.target_checkpoint_mut() else {
294+
let Some(target) = target.target_checkpoint() else {
292295
return Err(PowerSyncError::sync_protocol_error(
293296
"Received checkpoint complete without previous checkpoint",
294297
PowerSyncErrorCause::Unknown,
@@ -307,29 +310,34 @@ impl StreamingSyncIteration {
307310
severity: LogSeverity::WARNING,
308311
line: format!("Could not apply checkpoint, {checkpoint_result}").into(),
309312
});
310-
return Ok(true);
313+
SyncStateMachineTransition::CloseIteration
311314
}
312315
SyncLocalResult::PendingLocalChanges => {
313316
event.instructions.push(Instruction::LogLine {
314317
severity: LogSeverity::INFO,
315318
line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(),
316319
});
317320

318-
self.validated_but_not_applied = Some(target.clone());
321+
SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud {
322+
validated_but_not_applied: target.clone(),
323+
}
319324
}
320325
SyncLocalResult::ChangesApplied => {
321326
event.instructions.push(Instruction::LogLine {
322327
severity: LogSeverity::DEBUG,
323328
line: "Validated and applied checkpoint".into(),
324329
});
325330
event.instructions.push(Instruction::FlushFileSystem {});
326-
self.handle_checkpoint_applied(event)?;
331+
SyncStateMachineTransition::SyncLocalChangesApplied {
332+
partial: None,
333+
timestamp: self.adapter.now()?,
334+
}
327335
}
328336
}
329337
}
330338
SyncLine::CheckpointPartiallyComplete(complete) => {
331339
let priority = complete.priority;
332-
let Some(target) = target.target_checkpoint_mut() else {
340+
let Some(target) = target.target_checkpoint() else {
333341
return Err(PowerSyncError::state_error(
334342
"Received checkpoint complete without previous checkpoint",
335343
));
@@ -353,45 +361,105 @@ impl StreamingSyncIteration {
353361
)
354362
.into(),
355363
});
356-
return Ok(true);
364+
SyncStateMachineTransition::CloseIteration
357365
}
358366
SyncLocalResult::PendingLocalChanges => {
359367
// If we have pending uploads, we can't complete new checkpoints outside
360368
// of priority 0. We'll resolve this for a complete checkpoint later.
369+
SyncStateMachineTransition::Empty
361370
}
362371
SyncLocalResult::ChangesApplied => {
363372
let now = self.adapter.now()?;
364-
event.instructions.push(Instruction::FlushFileSystem {});
365-
self.status.update(
366-
|status| {
367-
status.partial_checkpoint_complete(priority, now);
368-
},
369-
&mut event.instructions,
370-
);
373+
SyncStateMachineTransition::SyncLocalChangesApplied {
374+
partial: Some(priority),
375+
timestamp: now,
376+
}
371377
}
372378
}
373379
}
374380
SyncLine::Data(data_line) => {
375-
self.status
376-
.update(|s| s.track_line(&data_line), &mut event.instructions);
377381
insert_bucket_operations(&self.adapter, &data_line)?;
382+
SyncStateMachineTransition::DataLineSaved { line: data_line }
378383
}
379384
SyncLine::KeepAlive(token) => {
380385
if token.is_expired() {
381386
// Token expired already - stop the connection immediately.
382387
event
383388
.instructions
384389
.push(Instruction::FetchCredentials { did_expire: true });
385-
return Ok(true);
390+
391+
SyncStateMachineTransition::CloseIteration
386392
} else if token.should_prefetch() {
387393
event
388394
.instructions
389395
.push(Instruction::FetchCredentials { did_expire: false });
396+
SyncStateMachineTransition::Empty
397+
} else {
398+
SyncStateMachineTransition::Empty
390399
}
391400
}
392-
}
401+
})
402+
}
393403

394-
Ok(false)
404+
/// Applies a sync state transition, returning whether the iteration should be stopped.
405+
fn apply_transition(
406+
&mut self,
407+
target: &mut SyncTarget,
408+
event: &mut ActiveEvent,
409+
transition: SyncStateMachineTransition,
410+
) -> bool {
411+
match transition {
412+
SyncStateMachineTransition::StartTrackingCheckpoint {
413+
progress,
414+
updated_target,
415+
} => {
416+
self.status.update(
417+
|s| s.start_tracking_checkpoint(progress),
418+
&mut event.instructions,
419+
);
420+
self.validated_but_not_applied = None;
421+
*target = updated_target;
422+
}
423+
SyncStateMachineTransition::DataLineSaved { line } => {
424+
self.status
425+
.update(|s| s.track_line(&line), &mut event.instructions);
426+
}
427+
SyncStateMachineTransition::CloseIteration => return true,
428+
SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud {
429+
validated_but_not_applied,
430+
} => {
431+
self.validated_but_not_applied = Some(validated_but_not_applied);
432+
}
433+
SyncStateMachineTransition::SyncLocalChangesApplied { partial, timestamp } => {
434+
if let Some(priority) = partial {
435+
self.status.update(
436+
|status| {
437+
status.partial_checkpoint_complete(priority, timestamp);
438+
},
439+
&mut event.instructions,
440+
);
441+
} else {
442+
self.handle_checkpoint_applied(event, timestamp);
443+
}
444+
}
445+
SyncStateMachineTransition::Empty => {}
446+
};
447+
448+
false
449+
}
450+
451+
/// Handles a single sync line.
452+
///
453+
/// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of
454+
/// error determines whether the iteration can continue.
455+
fn handle_line(
456+
&mut self,
457+
target: &mut SyncTarget,
458+
event: &mut ActiveEvent,
459+
line: &SyncLine,
460+
) -> Result<bool, PowerSyncError> {
461+
let transition = self.prepare_handling_sync_line(target, event, line)?;
462+
Ok(self.apply_transition(target, event, transition))
395463
}
396464

397465
/// Runs a full sync iteration, returning nothing when it completes regularly or an error when
@@ -432,7 +500,7 @@ impl StreamingSyncIteration {
432500
.into(),
433501
});
434502

435-
self.handle_checkpoint_applied(event)?;
503+
self.handle_checkpoint_applied(event, self.adapter.now()?);
436504
}
437505
_ => {
438506
event.instructions.push(Instruction::LogLine {
@@ -522,16 +590,13 @@ impl StreamingSyncIteration {
522590
Ok(local_bucket_names)
523591
}
524592

525-
fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent) -> Result<(), ResultCode> {
593+
fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent, timestamp: Timestamp) {
526594
event.instructions.push(Instruction::DidCompleteSync {});
527595

528-
let now = self.adapter.now()?;
529596
self.status.update(
530-
|status| status.applied_checkpoint(now),
597+
|status| status.applied_checkpoint(timestamp),
531598
&mut event.instructions,
532599
);
533-
534-
Ok(())
535600
}
536601
}
537602

@@ -553,18 +618,16 @@ impl SyncTarget {
553618
}
554619
}
555620

556-
fn target_checkpoint_mut(&mut self) -> Option<&mut OwnedCheckpoint> {
557-
match self {
558-
Self::Tracking(cp) => Some(cp),
559-
_ => None,
560-
}
561-
}
562-
563621
/// Starts tracking the received `Checkpoint`.
564622
///
565-
/// This updates the internal state and returns a set of buckets to delete because they've been
566-
/// tracked locally but not in the new checkpoint.
567-
fn track_checkpoint<'a>(&mut self, checkpoint: &Checkpoint<'a>) -> BTreeSet<String> {
623+
/// This returns a set of buckets to delete because they've been tracked locally but not in the
624+
/// checkpoint, as well as the updated state of the [SyncTarget] to apply after deleting those
625+
/// buckets.
626+
///
627+
/// The new state is not applied automatically - the old state should be kept in-memory until
628+
/// the buckets have actually been deleted so that the operation can be retried if deleting
629+
/// buckets fails.
630+
fn track_checkpoint<'a>(&self, checkpoint: &Checkpoint<'a>) -> (BTreeSet<String>, Self) {
568631
let mut to_delete: BTreeSet<String> = match &self {
569632
SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(),
570633
SyncTarget::BeforeCheckpoint(buckets) => buckets.iter().cloned().collect(),
@@ -576,8 +639,10 @@ impl SyncTarget {
576639
to_delete.remove(&*bucket.bucket);
577640
}
578641

579-
*self = SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets));
580-
to_delete
642+
(
643+
to_delete,
644+
SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets)),
645+
)
581646
}
582647
}
583648

@@ -614,3 +679,32 @@ impl OwnedCheckpoint {
614679
self.write_checkpoint = diff.write_checkpoint;
615680
}
616681
}
682+
683+
/// A transition representing pending changes between [StreamingSyncIteration::prepare_handling_sync_line]
684+
/// and [StreamingSyncIteration::apply_transition].
685+
///
686+
/// This split allows the main logic handling sync lines to take a non-mutable reference to internal
687+
/// client state, guaranteeing that it does not mutate state until changes have been written to the
688+
/// database. Only after those writes have succeeded are the internal state changes applied.
689+
///
690+
/// This split ensures that `powersync_control` calls are idempotent when running into temporary
691+
/// SQLite errors, a property we need for compatibility with e.g. WA-sqlite, where the VFS can
692+
/// return `BUSY` errors and the SQLite library automatically retries running statements.
693+
enum SyncStateMachineTransition<'a> {
694+
StartTrackingCheckpoint {
695+
progress: SyncDownloadProgress,
696+
updated_target: SyncTarget,
697+
},
698+
DataLineSaved {
699+
line: &'a DataLine<'a>,
700+
},
701+
SyncLocalFailedDueToPendingCrud {
702+
validated_but_not_applied: OwnedCheckpoint,
703+
},
704+
SyncLocalChangesApplied {
705+
partial: Option<BucketPriority>,
706+
timestamp: Timestamp,
707+
},
708+
CloseIteration,
709+
Empty,
710+
}

crates/core/src/sync/sync_status.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,12 @@ impl SyncDownloadProgress {
205205
);
206206
}
207207

208+
// Ignore errors here - SQLite seems to report errors from an earlier statement iteration
209+
// sometimes.
210+
let _ = adapter.progress_stmt.reset();
211+
208212
// Go through local bucket states to detect pending progress from previous sync iterations
209213
// that may have been interrupted.
210-
adapter.progress_stmt.reset()?;
211-
212214
while let Some(row) = adapter.step_progress()? {
213215
let Some(progress) = buckets.get_mut(row.bucket) else {
214216
continue;

0 commit comments

Comments
 (0)