Skip to content

Commit 47044b7

Browse files
authored
Merge pull request #87 from powersync-ja/raw-tables
Raw tables
2 parents 8de98da + cf68afd commit 47044b7

File tree

16 files changed

+673
-103
lines changed

16 files changed

+673
-103
lines changed

crates/core/src/crud_vtab.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ extern crate alloc;
22

33
use alloc::boxed::Box;
44
use alloc::string::String;
5+
use alloc::sync::Arc;
56
use const_format::formatcp;
67
use core::ffi::{c_char, c_int, c_void, CStr};
7-
use core::ptr::null_mut;
8+
use core::sync::atomic::Ordering;
89
use serde::Serialize;
910
use serde_json::value::RawValue;
1011

@@ -15,6 +16,7 @@ use sqlite_nostd::{self as sqlite, ColumnType};
1516
use crate::error::SQLiteError;
1617
use crate::ext::SafeManagedStmt;
1718
use crate::schema::TableInfoFlags;
19+
use crate::state::DatabaseState;
1820
use crate::util::MAX_OP_ID;
1921
use crate::vtab_util::*;
2022

@@ -41,6 +43,7 @@ struct VirtualTable {
4143
db: *mut sqlite::sqlite3,
4244
current_tx: Option<ActiveCrudTransaction>,
4345
is_simple: bool,
46+
state: Arc<DatabaseState>,
4447
}
4548

4649
struct ActiveCrudTransaction {
@@ -86,6 +89,15 @@ impl VirtualTable {
8689
.ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?;
8790
let db = self.db;
8891

92+
if self.state.is_in_sync_local.load(Ordering::Relaxed) {
93+
// Don't collect CRUD writes while we're syncing the local database - writes made here
94+
// aren't writes we should upload.
95+
// This normally doesn't happen because we insert directly into the data tables, but
96+
// users might have custom raw tables used for sycing with triggers on them to call
97+
// this function. And those specifically should not trigger here.
98+
return Ok(());
99+
}
100+
89101
match &mut current_tx.mode {
90102
CrudTransactionMode::Manual(manual) => {
91103
// Columns are (data TEXT, options INT HIDDEN)
@@ -258,7 +270,7 @@ fn prepare_lazy(
258270

259271
extern "C" fn connect(
260272
db: *mut sqlite::sqlite3,
261-
_aux: *mut c_void,
273+
aux: *mut c_void,
262274
argc: c_int,
263275
argv: *const *const c_char,
264276
vtab: *mut *mut sqlite::vtab,
@@ -289,6 +301,14 @@ extern "C" fn connect(
289301
pModule: core::ptr::null(),
290302
zErrMsg: core::ptr::null_mut(),
291303
},
304+
state: {
305+
// Increase refcount - we can't use from_raw alone because we don't own the aux
306+
// data (connect could be called multiple times).
307+
let state = Arc::from_raw(aux as *mut DatabaseState);
308+
let clone = state.clone();
309+
core::mem::forget(state);
310+
clone
311+
},
292312
db,
293313
current_tx: None,
294314
is_simple,
@@ -380,20 +400,20 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
380400
xIntegrity: None,
381401
};
382402

383-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
403+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
384404
sqlite::convert_rc(sqlite::create_module_v2(
385405
db,
386406
SIMPLE_NAME.as_ptr(),
387407
&MODULE,
388-
null_mut(),
389-
None,
408+
Arc::into_raw(state.clone()) as *mut c_void,
409+
Some(DatabaseState::destroy_arc),
390410
))?;
391411
sqlite::convert_rc(sqlite::create_module_v2(
392412
db,
393413
MANUAL_NAME.as_ptr(),
394414
&MODULE,
395-
null_mut(),
396-
None,
415+
Arc::into_raw(state) as *mut c_void,
416+
Some(DatabaseState::destroy_arc),
397417
))?;
398418

399419
Ok(())

crates/core/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ extern crate alloc;
99

1010
use core::ffi::{c_char, c_int};
1111

12+
use alloc::sync::Arc;
1213
use sqlite::ResultCode;
1314
use sqlite_nostd as sqlite;
1415

16+
use crate::state::DatabaseState;
17+
1518
mod bson;
1619
mod checkpoint;
1720
mod crud_vtab;
@@ -26,6 +29,7 @@ mod migrations;
2629
mod operations;
2730
mod operations_vtab;
2831
mod schema;
32+
mod state;
2933
mod sync;
3034
mod sync_local;
3135
mod util;
@@ -53,6 +57,8 @@ pub extern "C" fn sqlite3_powersync_init(
5357
}
5458

5559
fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
60+
let state = Arc::new(DatabaseState::new());
61+
5662
crate::version::register(db)?;
5763
crate::views::register(db)?;
5864
crate::uuid::register(db)?;
@@ -62,11 +68,12 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
6268
crate::view_admin::register(db)?;
6369
crate::checkpoint::register(db)?;
6470
crate::kv::register(db)?;
65-
sync::register(db)?;
71+
crate::state::register(db, state.clone())?;
72+
sync::register(db, state.clone())?;
6673

6774
crate::schema::register(db)?;
68-
crate::operations_vtab::register(db)?;
69-
crate::crud_vtab::register(db)?;
75+
crate::operations_vtab::register(db, state.clone())?;
76+
crate::crud_vtab::register(db, state)?;
7077

7178
Ok(())
7279
}

crates/core/src/operations_vtab.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
extern crate alloc;
22

33
use alloc::boxed::Box;
4+
use alloc::sync::Arc;
45
use core::ffi::{c_char, c_int, c_void};
56

67
use sqlite::{Connection, ResultCode, Value};
@@ -9,21 +10,23 @@ use sqlite_nostd as sqlite;
910
use crate::operations::{
1011
clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation,
1112
};
13+
use crate::state::DatabaseState;
1214
use crate::sync_local::sync_local;
1315
use crate::vtab_util::*;
1416

1517
#[repr(C)]
1618
struct VirtualTable {
1719
base: sqlite::vtab,
1820
db: *mut sqlite::sqlite3,
21+
state: Arc<DatabaseState>,
1922

2023
target_applied: bool,
2124
target_validated: bool,
2225
}
2326

2427
extern "C" fn connect(
2528
db: *mut sqlite::sqlite3,
26-
_aux: *mut c_void,
29+
aux: *mut c_void,
2730
_argc: c_int,
2831
_argv: *const *const c_char,
2932
vtab: *mut *mut sqlite::vtab,
@@ -43,6 +46,14 @@ extern "C" fn connect(
4346
zErrMsg: core::ptr::null_mut(),
4447
},
4548
db,
49+
state: {
50+
// Increase refcount - we can't use from_raw alone because we don't own the aux
51+
// data (connect could be called multiple times).
52+
let state = Arc::from_raw(aux as *mut DatabaseState);
53+
let clone = state.clone();
54+
core::mem::forget(state);
55+
clone
56+
},
4657
target_validated: false,
4758
target_applied: false,
4859
}));
@@ -83,7 +94,7 @@ extern "C" fn update(
8394
let result = insert_operation(db, args[3].text());
8495
vtab_result(vtab, result)
8596
} else if op == "sync_local" {
86-
let result = sync_local(db, &args[3]);
97+
let result = sync_local(&tab.state, db, &args[3]);
8798
if let Ok(result_row) = result {
8899
unsafe {
89100
*p_row_id = result_row;
@@ -139,8 +150,13 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
139150
xIntegrity: None,
140151
};
141152

142-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
143-
db.create_module_v2("powersync_operations", &MODULE, None, None)?;
153+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
154+
db.create_module_v2(
155+
"powersync_operations",
156+
&MODULE,
157+
Some(Arc::into_raw(state) as *mut c_void),
158+
Some(DatabaseState::destroy_arc),
159+
)?;
144160

145161
Ok(())
146162
}

crates/core/src/schema/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ use alloc::vec::Vec;
55
use serde::Deserialize;
66
use sqlite::ResultCode;
77
use sqlite_nostd as sqlite;
8-
pub use table_info::{DiffIncludeOld, Table, TableInfoFlags};
8+
pub use table_info::{
9+
DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table, TableInfoFlags,
10+
};
911

10-
#[derive(Deserialize)]
12+
#[derive(Deserialize, Default)]
1113
pub struct Schema {
12-
tables: Vec<table_info::Table>,
14+
pub tables: Vec<table_info::Table>,
15+
#[serde(default)]
16+
pub raw_tables: Vec<table_info::RawTable>,
1317
}
1418

1519
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {

crates/core/src/schema/table_info.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ pub struct Table {
1919
pub flags: TableInfoFlags,
2020
}
2121

22+
#[derive(Deserialize)]
23+
pub struct RawTable {
24+
pub name: String,
25+
pub put: PendingStatement,
26+
pub delete: PendingStatement,
27+
}
28+
2229
impl Table {
2330
pub fn from_json(text: &str) -> Result<Self, serde_json::Error> {
2431
serde_json::from_str(text)
@@ -229,3 +236,17 @@ impl<'de> Deserialize<'de> for TableInfoFlags {
229236
)
230237
}
231238
}
239+
240+
#[derive(Deserialize)]
241+
pub struct PendingStatement {
242+
pub sql: String,
243+
/// This vec should contain an entry for each parameter in [sql].
244+
pub params: Vec<PendingStatementValue>,
245+
}
246+
247+
#[derive(Deserialize)]
248+
pub enum PendingStatementValue {
249+
Id,
250+
Column(String),
251+
// TODO: Stuff like a raw object of put data?
252+
}

crates/core/src/state.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use core::{
2+
ffi::{c_int, c_void},
3+
sync::atomic::{AtomicBool, Ordering},
4+
};
5+
6+
use alloc::sync::Arc;
7+
use sqlite::{Connection, ResultCode};
8+
use sqlite_nostd::{self as sqlite, Context};
9+
10+
/// State that is shared for a SQLite database connection after the core extension has been
11+
/// registered on it.
12+
///
13+
/// `init_extension` allocates an instance of this in an `Arc` that is shared as user-data for
14+
/// functions/vtabs that need access to it.
15+
pub struct DatabaseState {
16+
pub is_in_sync_local: AtomicBool,
17+
}
18+
19+
impl DatabaseState {
20+
pub fn new() -> Self {
21+
DatabaseState {
22+
is_in_sync_local: AtomicBool::new(false),
23+
}
24+
}
25+
26+
pub fn sync_local_guard<'a>(&'a self) -> impl Drop + use<'a> {
27+
self.is_in_sync_local
28+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire)
29+
.expect("should not be syncing already");
30+
31+
struct ClearOnDrop<'a>(&'a DatabaseState);
32+
33+
impl Drop for ClearOnDrop<'_> {
34+
fn drop(&mut self) {
35+
self.0.is_in_sync_local.store(false, Ordering::Release);
36+
}
37+
}
38+
39+
ClearOnDrop(self)
40+
}
41+
42+
pub unsafe extern "C" fn destroy_arc(ptr: *mut c_void) {
43+
drop(Arc::from_raw(ptr.cast::<DatabaseState>()));
44+
}
45+
}
46+
47+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
48+
unsafe extern "C" fn func(
49+
ctx: *mut sqlite::context,
50+
_argc: c_int,
51+
_argv: *mut *mut sqlite::value,
52+
) {
53+
let data = ctx.user_data().cast::<DatabaseState>();
54+
let data = unsafe { data.as_ref() }.unwrap();
55+
56+
ctx.result_int(if data.is_in_sync_local.load(Ordering::Relaxed) {
57+
1
58+
} else {
59+
0
60+
});
61+
}
62+
63+
db.create_function_v2(
64+
"powersync_in_sync_operation",
65+
0,
66+
0,
67+
Some(Arc::into_raw(state) as *mut c_void),
68+
Some(func),
69+
None,
70+
None,
71+
Some(DatabaseState::destroy_arc),
72+
)?;
73+
Ok(())
74+
}

crates/core/src/sync/interface.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ use alloc::borrow::Cow;
55
use alloc::boxed::Box;
66
use alloc::rc::Rc;
77
use alloc::string::ToString;
8+
use alloc::sync::Arc;
89
use alloc::{string::String, vec::Vec};
910
use serde::{Deserialize, Serialize};
1011
use sqlite::{ResultCode, Value};
1112
use sqlite_nostd::{self as sqlite, ColumnType};
1213
use sqlite_nostd::{Connection, Context};
1314

1415
use crate::error::SQLiteError;
16+
use crate::schema::Schema;
17+
use crate::state::DatabaseState;
1518

1619
use super::streaming_sync::SyncClient;
1720
use super::sync_status::DownloadSyncStatus;
@@ -22,6 +25,8 @@ pub struct StartSyncStream {
2225
/// Bucket parameters to include in the request when opening a sync stream.
2326
#[serde(default)]
2427
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
28+
#[serde(default)]
29+
pub schema: Schema,
2530
}
2631

2732
/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
@@ -118,7 +123,7 @@ struct SqlController {
118123
client: SyncClient,
119124
}
120125

121-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
126+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
122127
extern "C" fn control(
123128
ctx: *mut sqlite::context,
124129
argc: c_int,
@@ -199,7 +204,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
199204
}
200205

201206
let controller = Box::new(SqlController {
202-
client: SyncClient::new(db),
207+
client: SyncClient::new(db, state),
203208
});
204209

205210
db.create_function_v2(

crates/core/src/sync/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use alloc::sync::Arc;
12
use sqlite_nostd::{self as sqlite, ResultCode};
23

34
mod bucket_priority;
@@ -13,6 +14,8 @@ mod sync_status;
1314
pub use bucket_priority::BucketPriority;
1415
pub use checksum::Checksum;
1516

16-
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
17-
interface::register(db)
17+
use crate::state::DatabaseState;
18+
19+
pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(), ResultCode> {
20+
interface::register(db, state)
1821
}

0 commit comments

Comments
 (0)