Skip to content

Commit 3a72593

Browse files
committed
fix: use oneshot channel for cancellation
1 parent b551a60 commit 3a72593

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

libsql/src/database/builder.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,10 @@ cfg_sync! {
644644

645645
let mut bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>> = None;
646646

647+
647648
if let Some(sync_interval) = sync_interval {
649+
let (cancel_tx, mut cancel_rx) = tokio::sync::oneshot::channel::<()>();
650+
648651
let sync_span = tracing::debug_span!("sync_interval");
649652
let _enter = sync_span.enter();
650653

@@ -659,7 +662,8 @@ cfg_sync! {
659662
// `bootstrap_db` (for synced dbs) before calling connect. Otherwise, the sync
660663
// protocol skips calling `export` endpoint causing slowdown in initial bootstrap.
661664
let conn = db.connect()?;
662-
let jh = tokio::spawn(
665+
666+
tokio::spawn(
663667
async move {
664668
let mut interval = tokio::time::interval(sync_interval);
665669

@@ -669,21 +673,32 @@ cfg_sync! {
669673
interval.tick().await;
670674

671675
let mut ctx = sync_ctx.lock().await;
676+
672677
if remote_writes {
673-
if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await {
674-
tracing::error!("sync error: {}", e);
678+
tokio::select! {
679+
_ = &mut cancel_rx => break,
680+
result = crate::sync::try_pull(&mut ctx, &conn) => {
681+
if let Err(e) = result {
682+
tracing::error!("sync error: {}", e);
683+
}
684+
}
675685
}
676686
} else {
677-
if let Err(e) = crate::sync::sync_offline(&mut ctx, &conn).await {
678-
tracing::error!("sync error: {}", e);
687+
tokio::select! {
688+
_ = &mut cancel_rx => break,
689+
result = crate::sync::sync_offline(&mut ctx, &conn) => {
690+
if let Err(e) = result {
691+
tracing::error!("sync error: {}", e);
692+
}
693+
}
679694
}
680695
}
681696
}
682697
}
683698
.instrument(tracing::debug_span!("sync interval thread")),
684699
);
685700

686-
bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle())));
701+
bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(Some(cancel_tx))));
687702
}
688703

689704
Ok(Database {

libsql/src/sync.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
66
use chrono::Utc;
77
use http::{HeaderValue, StatusCode};
88
use hyper::Body;
9-
use tokio::{io::AsyncWriteExt as _, task::AbortHandle};
9+
use tokio::io::AsyncWriteExt as _;
1010
use uuid::Uuid;
1111

1212
#[cfg(test)]
@@ -81,11 +81,14 @@ pub struct PushResult {
8181
baton: Option<String>,
8282
}
8383

84-
pub struct DropAbort(pub AbortHandle);
84+
pub struct DropAbort(pub Option<tokio::sync::oneshot::Sender<()>>);
8585

8686
impl Drop for DropAbort {
8787
fn drop(&mut self) {
88-
self.0.abort();
88+
tracing::debug!("aborting");
89+
if let Some(sender) = self.0.take() { // Replaces with None, returns owned value
90+
let _ = sender.send(()); // Now we own the sender
91+
}
8992
}
9093
}
9194

@@ -846,8 +849,6 @@ async fn try_push(
846849

847850
let mut frame_no = start_frame_no;
848851
while frame_no <= end_frame_no {
849-
tokio::task::yield_now().await;
850-
851852
let batch_size = sync_ctx.push_batch_size.min(end_frame_no - frame_no + 1);
852853
let mut frames = conn.wal_get_frame(frame_no, page_size)?;
853854
if batch_size > 1 {
@@ -895,8 +896,6 @@ pub async fn try_pull(
895896
let mut err = None;
896897

897898
loop {
898-
tokio::task::yield_now().await;
899-
900899
let generation = sync_ctx.durable_generation();
901900
let frame_no = sync_ctx.durable_frame_num() + 1;
902901
match sync_ctx.pull_one_frame(generation, frame_no).await {

0 commit comments

Comments
 (0)