Skip to content

Commit c3950ab

Browse files
committed
Start tracking checkpoints
1 parent 2c5f617 commit c3950ab

21 files changed

+650
-148
lines changed

Cargo.lock

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ default-members = ["crates/shell", "crates/sqlite"]
99

1010
[profile.dev]
1111
panic = "abort"
12-
strip = true
12+
#strip = true
1313

1414
[profile.release]
1515
panic = "abort"

crates/core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ bytes = { version = "1.4", default-features = false }
1818
num-traits = { version = "0.2.15", default-features = false }
1919
num-derive = "0.3"
2020
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
21-
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
21+
serde = { version = "1.0", default-features = false, features = ["alloc", "derive", "rc"] }
2222
streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] }
2323
lock_api = { version = "0.4.12", default-features = false }
24-
futures-lite = { version = "2.6.0", default-features = false }
24+
futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] }
25+
rustc-hash = { version = "2.1", default-features = false }
2526

2627
[dependencies.uuid]
2728
version = "1.4.1"

crates/core/src/bson/error.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use core::fmt::Display;
22

33
use alloc::{
44
boxed::Box,
5-
format,
65
string::{String, ToString},
76
};
87
use serde::de::{self, StdError};

crates/core/src/kv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ use sqlite::ResultCode;
88
use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

11-
use crate::bucket_priority::BucketPriority;
1211
use crate::create_sqlite_optional_text_fn;
1312
use crate::create_sqlite_text_fn;
1413
use crate::error::SQLiteError;
14+
use crate::sync::bucket_priority::BucketPriority;
1515

1616
fn powersync_client_id_impl(
1717
ctx: *mut sqlite::context,

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use sqlite::ResultCode;
1313
use sqlite_nostd as sqlite;
1414

1515
mod bson;
16-
mod bucket_priority;
1716
mod checkpoint;
1817
mod crud_vtab;
1918
mod diff;
@@ -62,6 +61,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
6261
crate::view_admin::register(db)?;
6362
crate::checkpoint::register(db)?;
6463
crate::kv::register(db)?;
64+
sync::register(db)?;
6565

6666
crate::schema::register(db)?;
6767
crate::operations_vtab::register(db)?;

crates/core/src/macros.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,7 @@ macro_rules! create_sqlite_text_fn {
1111
let result = $fn_impl_name(ctx, args);
1212

1313
if let Err(err) = result {
14-
let SQLiteError(code, message) = SQLiteError::from(err);
15-
if message.is_some() {
16-
ctx.result_error(&format!("{:} {:}", $description, message.unwrap()));
17-
} else {
18-
let error = ctx.db_handle().errmsg().unwrap();
19-
if error == "not an error" {
20-
ctx.result_error(&format!("{:}", $description));
21-
} else {
22-
ctx.result_error(&format!("{:} {:}", $description, error));
23-
}
24-
}
25-
ctx.result_error_code(code);
14+
crate::util::context_set_error(ctx, SQLiteError::from(err), $description);
2615
} else if let Ok(r) = result {
2716
ctx.result_text_transient(&r);
2817
}
@@ -43,18 +32,7 @@ macro_rules! create_sqlite_optional_text_fn {
4332
let result = $fn_impl_name(ctx, args);
4433

4534
if let Err(err) = result {
46-
let SQLiteError(code, message) = SQLiteError::from(err);
47-
if message.is_some() {
48-
ctx.result_error(&format!("{:} {:}", $description, message.unwrap()));
49-
} else {
50-
let error = ctx.db_handle().errmsg().unwrap();
51-
if error == "not an error" {
52-
ctx.result_error(&format!("{:}", $description));
53-
} else {
54-
ctx.result_error(&format!("{:} {:}", $description, error));
55-
}
56-
}
57-
ctx.result_error_code(code);
35+
crate::util::context_set_error(ctx, SQLiteError::from(err), $description);
5836
} else if let Ok(r) = result {
5937
if let Some(s) = r {
6038
ctx.result_text_transient(&s);

crates/core/src/migrations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use sqlite::ResultCode;
88
use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

11-
use crate::bucket_priority::BucketPriority;
1211
use crate::error::{PSResult, SQLiteError};
1312
use crate::fix035::apply_v035_fix;
13+
use crate::sync::bucket_priority::BucketPriority;
1414

1515
pub fn powersync_migrate(
1616
ctx: *mut sqlite::context,

crates/core/src/bucket_priority.rs renamed to crates/core/src/sync/bucket_priority.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use serde::{de::Visitor, Deserialize};
1+
use serde::{de::Visitor, Deserialize, Serialize};
22
use sqlite_nostd::ResultCode;
33

44
use crate::error::SQLiteError;
55

66
#[repr(transparent)]
7-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
7+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
88
pub struct BucketPriority {
99
pub number: i32,
1010
}
@@ -14,6 +14,9 @@ impl BucketPriority {
1414
self == BucketPriority::HIGHEST
1515
}
1616

17+
/// The priority to use when the sync service doesn't attach priorities in checkpoints.
18+
pub const FALLBACK: BucketPriority = BucketPriority { number: 3 };
19+
1720
pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };
1821

1922
/// A low priority used to represent fully-completed sync operations across all priorities.
@@ -87,3 +90,12 @@ impl<'de> Deserialize<'de> for BucketPriority {
8790
deserializer.deserialize_i32(PriorityVisitor)
8891
}
8992
}
93+
94+
impl Serialize for BucketPriority {
95+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
96+
where
97+
S: serde::Serializer,
98+
{
99+
serializer.serialize_i32(self.number)
100+
}
101+
}

crates/core/src/sync/interface.rs

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,50 @@
1+
use core::cell::RefCell;
2+
use core::ffi::{c_int, c_void};
3+
4+
use alloc::boxed::Box;
5+
use alloc::rc::Rc;
6+
use alloc::string::ToString;
17
use alloc::{string::String, vec::Vec};
28
use serde::Serialize;
9+
use sqlite::{ResultCode, Value};
10+
use sqlite_nostd::{self as sqlite, ColumnType};
11+
use sqlite_nostd::{Connection, Context};
12+
13+
use crate::error::SQLiteError;
14+
use crate::util::context_set_error;
15+
16+
use super::streaming_sync::SyncClient;
17+
use super::sync_status::DownloadSyncStatus;
18+
19+
pub enum SyncControlRequest<'a> {
20+
StartSyncStream {
21+
parameters: Option<serde_json::Map<String, serde_json::Value>>,
22+
},
23+
StopSyncStream,
24+
SyncEvent(SyncEvent<'a>),
25+
}
326

427
pub enum SyncEvent<'a> {
5-
StartSyncStream,
6-
SyncStreamClosed { error: bool },
28+
Initialize,
29+
TearDown,
730
TextLine { data: &'a str },
831
BinaryLine { data: &'a [u8] },
932
}
1033

1134
/// An instruction sent by the core extension to the SDK.
1235
#[derive(Serialize)]
1336
pub enum Instruction {
14-
LogLine { severity: LogSeverity, line: String },
15-
EstablishSyncStream { request: StreamingSyncRequest },
37+
LogLine {
38+
severity: LogSeverity,
39+
line: String,
40+
},
41+
UpdateSyncStatus {
42+
status: Rc<RefCell<DownloadSyncStatus>>,
43+
},
44+
EstablishSyncStream {
45+
request: StreamingSyncRequest,
46+
},
47+
FlushFileSystem,
1648
CloseSyncStream,
1749
}
1850

@@ -37,3 +69,81 @@ pub struct BucketRequest {
3769
pub name: String,
3870
pub after: String,
3971
}
72+
73+
struct SqlController {
74+
client: SyncClient,
75+
}
76+
77+
pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
78+
extern "C" fn control(
79+
ctx: *mut sqlite::context,
80+
argc: c_int,
81+
argv: *mut *mut sqlite::value,
82+
) -> () {
83+
let result = (|| -> Result<(), SQLiteError> {
84+
let controller = unsafe { ctx.user_data().cast::<SqlController>().as_ref() }
85+
.ok_or_else(|| SQLiteError::from(ResultCode::INTERNAL))?;
86+
87+
let args = sqlite::args!(argc, argv);
88+
let [op, payload] = args else {
89+
return Err(ResultCode::MISUSE.into());
90+
};
91+
92+
if op.value_type() != ColumnType::Text {
93+
return Err(SQLiteError(
94+
ResultCode::MISUSE,
95+
Some("First argument must be a string".to_string()),
96+
));
97+
}
98+
99+
let op = op.text();
100+
let event = match op {
101+
"start" => SyncControlRequest::StartSyncStream {
102+
parameters: if payload.value_type() == ColumnType::Text {
103+
Some(serde_json::from_str(payload.text())?)
104+
} else {
105+
None
106+
},
107+
},
108+
"stop" => SyncControlRequest::StopSyncStream,
109+
_ => {
110+
return Err(SQLiteError(
111+
ResultCode::MISUSE,
112+
Some("Unknown operation".to_string()),
113+
))
114+
}
115+
};
116+
117+
let instructions = controller.client.push_event(event)?;
118+
let formatted = serde_json::to_string(&instructions)?;
119+
ctx.result_text_transient(&formatted);
120+
121+
Ok(())
122+
})();
123+
124+
if let Err(e) = result {
125+
context_set_error(ctx, e, "powersync_control");
126+
}
127+
}
128+
129+
unsafe extern "C" fn destroy(ptr: *mut c_void) {
130+
drop(Box::from_raw(ptr.cast::<SqlController>()));
131+
}
132+
133+
let controller = Box::new(SqlController {
134+
client: SyncClient::new(db),
135+
});
136+
137+
db.create_function_v2(
138+
"powersync_control",
139+
2,
140+
sqlite::UTF8 | sqlite::DIRECTONLY,
141+
Some(Box::into_raw(controller).cast()),
142+
Some(control),
143+
None,
144+
None,
145+
Some(destroy),
146+
)?;
147+
148+
Ok(())
149+
}

0 commit comments

Comments
 (0)