Skip to content

Commit 7005f2a

Browse files
committed
Add function to determine whether local sync is active
1 parent 227b585 commit 7005f2a

File tree

9 files changed

+209
-28
lines changed

9 files changed

+209
-28
lines changed

crates/core/src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ extern crate alloc;
99

1010
use core::ffi::{c_char, c_int};
1111

12+
use alloc::sync::Arc;
1213
use sqlite::ResultCode;
1314
use sqlite_nostd as sqlite;
1415

16+
use crate::state::DatabaseState;
17+
1518
mod bson;
1619
mod checkpoint;
1720
mod crud_vtab;
@@ -26,6 +29,7 @@ mod migrations;
2629
mod operations;
2730
mod operations_vtab;
2831
mod schema;
32+
mod state;
2933
mod sync;
3034
mod sync_local;
3135
mod util;
@@ -53,6 +57,8 @@ pub extern "C" fn sqlite3_powersync_init(
5357
}
5458

5559
fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
60+
let state = Arc::new(DatabaseState::new());
61+
5662
crate::version::register(db)?;
5763
crate::views::register(db)?;
5864
crate::uuid::register(db)?;
@@ -62,10 +68,11 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
6268
crate::view_admin::register(db)?;
6369
crate::checkpoint::register(db)?;
6470
crate::kv::register(db)?;
65-
sync::register(db)?;
71+
crate::state::register(db, state.clone())?;
72+
sync::register(db, state.clone())?;
6673

6774
crate::schema::register(db)?;
68-
crate::operations_vtab::register(db)?;
75+
crate::operations_vtab::register(db, state.clone())?;
6976
crate::crud_vtab::register(db)?;
7077

7178
Ok(())

crates/core/src/operations_vtab.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
extern crate alloc;
22

33
use alloc::boxed::Box;
4+
use alloc::sync::Arc;
45
use core::ffi::{c_char, c_int, c_void};
56

67
use sqlite::{Connection, ResultCode, Value};
@@ -9,21 +10,23 @@ use sqlite_nostd as sqlite;
910
use crate::operations::{
1011
clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation,
1112
};
13+
use crate::state::DatabaseState;
1214
use crate::sync_local::sync_local;
1315
use crate::vtab_util::*;
1416

1517
#[repr(C)]
1618
struct VirtualTable {
1719
base: sqlite::vtab,
1820
db: *mut sqlite::sqlite3,
21+
state: Arc<DatabaseState>,
1922

2023
target_applied: bool,
2124
target_validated: bool,
2225
}
2326

2427
extern "C" fn connect(
2528
db: *mut sqlite::sqlite3,
26-
_aux: *mut c_void,
29+
aux: *mut c_void,
2730
_argc: c_int,
2831
_argv: *const *const c_char,
2932
vtab: *mut *mut sqlite::vtab,
@@ -43,6 +46,14 @@ extern "C" fn connect(
4346
zErrMsg: core::ptr::null_mut(),
4447
},
4548
db,
49+
state: {
50+
// Increase refcount - we can't use from_raw alone because we don't own the aux
51+
// data (connect could be called multiple times).
52+
let state = Arc::from_raw(aux as *mut DatabaseState);
53+
let clone = state.clone();
54+
core::mem::forget(state);
55+
clone
56+
},
4657
target_validated: false,
4758
target_applied: false,
4859
}));
@@ -83,7 +94,7 @@ extern "C" fn update(
8394
let result = insert_operation(db, args[3].text());
8495
vtab_result(vtab, result)
8596
} else if op == "sync_local" {
86-
let result = sync_local(db, &args[3]);
97+
let result = sync_local(&tab.state, db, &args[3]);
8798
if let Ok(result_row) = result {
8899
unsafe {
89100
*p_row_id = result_row;
@@ -139,8 +150,13 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
139150
xIntegrity: None,
140151
};
141152

142-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
143-
db.create_module_v2("powersync_operations", &MODULE, None, None)?;
153+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
154+
db.create_module_v2(
155+
"powersync_operations",
156+
&MODULE,
157+
Some(Arc::into_raw(state) as *mut c_void),
158+
Some(DatabaseState::destroy_arc),
159+
)?;
144160

145161
Ok(())
146162
}

crates/core/src/state.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use core::{
2+
ffi::{c_int, c_void},
3+
sync::atomic::{AtomicBool, Ordering},
4+
};
5+
6+
use alloc::sync::Arc;
7+
use sqlite::{Connection, ResultCode};
8+
use sqlite_nostd::{self as sqlite, Context};
9+
10+
/// State that is shared for a SQLite database connection after the core extension has been
11+
/// registered on it.
12+
///
13+
/// `init_extension` allocates an instance of this in an `Arc` that is shared as user-data for
14+
/// functions/vtabs that need access to it.
15+
pub struct DatabaseState {
16+
pub is_in_sync_local: AtomicBool,
17+
}
18+
19+
impl DatabaseState {
20+
pub fn new() -> Self {
21+
DatabaseState {
22+
is_in_sync_local: AtomicBool::new(false),
23+
}
24+
}
25+
26+
pub fn sync_local_guard<'a>(&'a self) -> impl Drop + use<'a> {
27+
self.is_in_sync_local
28+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
29+
.expect("should not be syncing already");
30+
31+
struct ClearOnDrop<'a>(&'a DatabaseState);
32+
33+
impl Drop for ClearOnDrop<'_> {
34+
fn drop(&mut self) {
35+
self.0.is_in_sync_local.store(false, Ordering::Release);
36+
}
37+
}
38+
39+
ClearOnDrop(self)
40+
}
41+
42+
pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) {
43+
drop(Arc::from_raw(ptr.cast::<DatabaseState>()));
44+
}
45+
}
46+
47+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
48+
unsafe extern "C" fn func(
49+
ctx: *mut sqlite::context,
50+
_argc: c_int,
51+
_argv: *mut *mut sqlite::value,
52+
) {
53+
let data = ctx.user_data().cast::<DatabaseState>();
54+
let data = unsafe { data.as_ref() }.unwrap();
55+
56+
ctx.result_int(if data.is_in_sync_local.load(Ordering::Relaxed) {
57+
1
58+
} else {
59+
0
60+
});
61+
}
62+
63+
db.create_function_v2(
64+
"powersync_in_sync_operation",
65+
0,
66+
0,
67+
Some(Arc::into_raw(state) as *mut c_void),
68+
Some(func),
69+
None,
70+
None,
71+
Some(DatabaseState::destroy_arc),
72+
)?;
73+
Ok(())
74+
}

crates/core/src/sync/interface.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use alloc::borrow::Cow;
55
use alloc::boxed::Box;
66
use alloc::rc::Rc;
77
use alloc::string::ToString;
8+
use alloc::sync::Arc;
89
use alloc::{string::String, vec::Vec};
910
use serde::{Deserialize, Serialize};
1011
use sqlite::{ResultCode, Value};
@@ -13,6 +14,7 @@ use sqlite_nostd::{Connection, Context};
1314

1415
use crate::error::SQLiteError;
1516
use crate::schema::Schema;
17+
use crate::state::DatabaseState;
1618

1719
use super::streaming_sync::SyncClient;
1820
use super::sync_status::DownloadSyncStatus;
@@ -121,7 +123,7 @@ struct SqlController {
121123
client: SyncClient,
122124
}
123125

124-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
126+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
125127
extern "C" fn control(
126128
ctx: *mut sqlite::context,
127129
argc: c_int,
@@ -202,7 +204,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
202204
}
203205

204206
let controller = Box::new(SqlController {
205-
client: SyncClient::new(db),
207+
client: SyncClient::new(db, state),
206208
});
207209

208210
db.create_function_v2(

crates/core/src/sync/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use alloc::sync::Arc;
12
use sqlite_nostd::{self as sqlite, ResultCode};
23

34
mod bucket_priority;
@@ -13,6 +14,8 @@ mod sync_status;
1314
pub use bucket_priority::BucketPriority;
1415
pub use checksum::Checksum;
1516

16-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
17-
interface::register(db)
17+
use crate::state::DatabaseState;
18+
19+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
20+
interface::register(db, state)
1821
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::{assert_matches::debug_assert_matches, fmt::Display};
22

3-
use alloc::{string::ToString, vec::Vec};
3+
use alloc::{string::ToString, sync::Arc, vec::Vec};
44
use serde::Serialize;
55
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
66
use streaming_iterator::StreamingIterator;
@@ -10,6 +10,7 @@ use crate::{
1010
ext::SafeManagedStmt,
1111
operations::delete_bucket,
1212
schema::Schema,
13+
state::DatabaseState,
1314
sync::checkpoint::{validate_checkpoint, ChecksumMismatch},
1415
sync_local::{PartialSyncOperation, SyncOperation},
1516
};
@@ -144,6 +145,7 @@ impl StorageAdapter {
144145

145146
pub fn sync_local(
146147
&self,
148+
state: &DatabaseState,
147149
checkpoint: &OwnedCheckpoint,
148150
priority: Option<BucketPriority>,
149151
schema: &Schema,
@@ -185,7 +187,7 @@ impl StorageAdapter {
185187

186188
let sync_result = match priority {
187189
None => {
188-
let mut sync = SyncOperation::new(self.db, None);
190+
let mut sync = SyncOperation::new(state, self.db, None);
189191
sync.use_schema(schema);
190192
sync.apply()
191193
}
@@ -208,6 +210,7 @@ impl StorageAdapter {
208210
// TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters.
209211
let serialized_args = serde_json::to_string(&args)?;
210212
let mut sync = SyncOperation::new(
213+
state,
211214
self.db,
212215
Some(PartialSyncOperation {
213216
priority,

crates/core/src/sync/streaming_sync.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use alloc::{
1010
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
1111
format,
1212
string::{String, ToString},
13+
sync::Arc,
1314
vec::Vec,
1415
};
1516
use futures_lite::FutureExt;
@@ -18,6 +19,7 @@ use crate::{
1819
bson,
1920
error::SQLiteError,
2021
kv::client_id,
22+
state::DatabaseState,
2123
sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream},
2224
};
2325
use sqlite_nostd::{self as sqlite, ResultCode};
@@ -37,14 +39,16 @@ use super::{
3739
/// initialized.
3840
pub struct SyncClient {
3941
db: *mut sqlite::sqlite3,
42+
db_state: Arc<DatabaseState>,
4043
/// The current [ClientState] (essentially an optional [StreamingSyncIteration]).
4144
state: ClientState,
4245
}
4346

4447
impl SyncClient {
45-
pub fn new(db: *mut sqlite::sqlite3) -> Self {
48+
pub fn new(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Self {
4649
Self {
4750
db,
51+
db_state: state,
4852
state: ClientState::Idle,
4953
}
5054
}
@@ -57,7 +61,7 @@ impl SyncClient {
5761
SyncControlRequest::StartSyncStream(options) => {
5862
self.state.tear_down()?;
5963

60-
let mut handle = SyncIterationHandle::new(self.db, options)?;
64+
let mut handle = SyncIterationHandle::new(self.db, options, self.db_state.clone())?;
6165
let instructions = handle.initialize()?;
6266
self.state = ClientState::IterationActive(handle);
6367

@@ -125,10 +129,15 @@ struct SyncIterationHandle {
125129
impl SyncIterationHandle {
126130
/// Creates a new sync iteration in a pending state by preparing statements for
127131
/// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] .
128-
fn new(db: *mut sqlite::sqlite3, options: StartSyncStream) -> Result<Self, ResultCode> {
132+
fn new(
133+
db: *mut sqlite::sqlite3,
134+
options: StartSyncStream,
135+
state: Arc<DatabaseState>,
136+
) -> Result<Self, ResultCode> {
129137
let runner = StreamingSyncIteration {
130138
db,
131139
options,
140+
state,
132141
adapter: StorageAdapter::new(db)?,
133142
status: SyncStatusContainer::new(),
134143
};
@@ -192,6 +201,7 @@ impl<'a> ActiveEvent<'a> {
192201

193202
struct StreamingSyncIteration {
194203
db: *mut sqlite::sqlite3,
204+
state: Arc<DatabaseState>,
195205
adapter: StorageAdapter,
196206
options: StartSyncStream,
197207
status: SyncStatusContainer,
@@ -246,9 +256,12 @@ impl StreamingSyncIteration {
246256
SyncEvent::BinaryLine { data } => bson::from_bytes(data)?,
247257
SyncEvent::UploadFinished => {
248258
if let Some(checkpoint) = validated_but_not_applied.take() {
249-
let result =
250-
self.adapter
251-
.sync_local(&checkpoint, None, &self.options.schema)?;
259+
let result = self.adapter.sync_local(
260+
&self.state,
261+
&checkpoint,
262+
None,
263+
&self.options.schema,
264+
)?;
252265

253266
match result {
254267
SyncLocalResult::ChangesApplied => {
@@ -324,9 +337,9 @@ impl StreamingSyncIteration {
324337
),
325338
));
326339
};
327-
let result = self
328-
.adapter
329-
.sync_local(target, None, &self.options.schema)?;
340+
let result =
341+
self.adapter
342+
.sync_local(&self.state, target, None, &self.options.schema)?;
330343

331344
match result {
332345
SyncLocalResult::ChecksumFailure(checkpoint_result) => {
@@ -369,9 +382,12 @@ impl StreamingSyncIteration {
369382
),
370383
));
371384
};
372-
let result =
373-
self.adapter
374-
.sync_local(target, Some(priority), &self.options.schema)?;
385+
let result = self.adapter.sync_local(
386+
&self.state,
387+
target,
388+
Some(priority),
389+
&self.options.schema,
390+
)?;
375391

376392
match result {
377393
SyncLocalResult::ChecksumFailure(checkpoint_result) => {

0 commit comments

Comments
 (0)