Skip to content

Expand support for update metadata and tracking old values #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 24, 2025
2 changes: 1 addition & 1 deletion .github/workflows/android.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
java-version: "17"

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@ccb4328a959376b642e027874838f60f8e596de3
uses: gradle/actions/wrapper-validation@v4

- name: Setup
run: |
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "sqlite-rs-embedded"]
path = sqlite-rs-embedded
url = https://github.com/vlcn-io/sqlite-rs-embedded.git
url = https://github.com/powersync-ja/sqlite-rs-embedded.git
61 changes: 45 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ num-traits = { version = "0.2.15", default-features = false }
num-derive = "0.3"
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] }

[dependencies.uuid]
version = "1.4.1"
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use alloc::format;
use alloc::string::String;
use alloc::vec::Vec;
use core::ffi::c_int;
use core::slice;

use serde::{Deserialize, Serialize};
use serde_json as json;
Expand Down Expand Up @@ -59,9 +58,9 @@ GROUP BY bucket_list.bucket",
while statement.step()? == ResultCode::ROW {
let name = statement.column_text(0)?;
// checksums with column_int are wrapped to i32 by SQLite
let add_checksum = statement.column_int(1)?;
let oplog_checksum = statement.column_int(2)?;
let expected_checksum = statement.column_int(3)?;
let add_checksum = statement.column_int(1);
let oplog_checksum = statement.column_int(2);
let expected_checksum = statement.column_int(3);

// wrapping add is like +, but safely overflows
let checksum = oplog_checksum.wrapping_add(add_checksum);
Expand Down
29 changes: 16 additions & 13 deletions crates/core/src/crud_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ extern crate alloc;
use alloc::boxed::Box;
use alloc::string::String;
use core::ffi::{c_char, c_int, c_void};
use core::slice;

use sqlite::{Connection, ResultCode, Value};
use sqlite_nostd as sqlite;
Expand All @@ -29,7 +28,7 @@ struct VirtualTable {
base: sqlite::vtab,
db: *mut sqlite::sqlite3,
current_tx: Option<i64>,
insert_statement: Option<ManagedStmt>
insert_statement: Option<ManagedStmt>,
}

extern "C" fn connect(
Expand All @@ -40,8 +39,7 @@ extern "C" fn connect(
vtab: *mut *mut sqlite::vtab,
_err: *mut *mut c_char,
) -> c_int {
if let Err(rc) = sqlite::declare_vtab(db, "CREATE TABLE powersync_crud_(data TEXT);")
{
if let Err(rc) = sqlite::declare_vtab(db, "CREATE TABLE powersync_crud_(data TEXT);") {
return rc as c_int;
}

Expand All @@ -54,7 +52,7 @@ extern "C" fn connect(
},
db,
current_tx: None,
insert_statement: None
insert_statement: None,
}));
*vtab = tab.cast::<sqlite::vtab>();
let _ = sqlite::vtab_config(db, 0);
Expand All @@ -69,17 +67,17 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
ResultCode::OK as c_int
}


fn begin_impl(tab: &mut VirtualTable) -> Result<(), SQLiteError> {
let db = tab.db;

let insert_statement = db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?1, ?2)", 0)?;
tab.insert_statement = Some(insert_statement);

// language=SQLite
let statement = db.prepare_v2("UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx")?;
let statement =
db.prepare_v2("UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx")?;
if statement.step()? == ResultCode::ROW {
let tx_id = statement.column_int64(0)? - 1;
let tx_id = statement.column_int64(0) - 1;
tab.current_tx = Some(tx_id);
} else {
return Err(SQLiteError::from(ResultCode::ABORT));
Expand Down Expand Up @@ -109,23 +107,27 @@ extern "C" fn rollback(vtab: *mut sqlite::vtab) -> c_int {
ResultCode::OK as c_int
}

fn insert_operation(
vtab: *mut sqlite::vtab, data: &str) -> Result<(), SQLiteError> {
fn insert_operation(vtab: *mut sqlite::vtab, data: &str) -> Result<(), SQLiteError> {
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
if tab.current_tx.is_none() {
return Err(SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))));
return Err(SQLiteError(
ResultCode::MISUSE,
Some(String::from("No tx_id")),
));
}
let current_tx = tab.current_tx.unwrap();
// language=SQLite
let statement = tab.insert_statement.as_ref().ok_or(SQLiteError::from(NULL))?;
let statement = tab
.insert_statement
.as_ref()
.ok_or(SQLiteError::from(NULL))?;
statement.bind_int64(1, current_tx)?;
statement.bind_text(2, data, sqlite::Destructor::STATIC)?;
statement.exec()?;

Ok(())
}


extern "C" fn update(
vtab: *mut sqlite::vtab,
argc: c_int,
Expand Down Expand Up @@ -178,6 +180,7 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
xRelease: None,
xRollbackTo: None,
xShadowName: None,
xIntegrity: None,
};

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/json_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ extern crate alloc;
use alloc::format;
use alloc::string::{String, ToString};
use core::ffi::c_int;
use core::slice;

use sqlite::ResultCode;
use sqlite_nostd as sqlite;
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ extern crate alloc;
use alloc::format;
use alloc::string::{String, ToString};
use core::ffi::c_int;
use core::slice;

use sqlite::ResultCode;
use sqlite_nostd as sqlite;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod macros;
mod migrations;
mod operations;
mod operations_vtab;
mod schema_management;
mod schema;
mod sync_local;
mod sync_types;
mod util;
Expand Down Expand Up @@ -62,7 +62,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
crate::checkpoint::register(db)?;
crate::kv::register(db)?;

crate::schema_management::register(db)?;
crate::schema::register(db)?;
crate::operations_vtab::register(db)?;
crate::crud_vtab::register(db)?;

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
return Err(SQLiteError::from(ResultCode::ABORT));
}

let mut current_version = current_version_stmt.column_int(0)?;
let mut current_version = current_version_stmt.column_int(0);

while current_version > target_version {
// Run down migrations.
Expand Down Expand Up @@ -78,7 +78,7 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
Some("Down migration failed - could not get version".to_string()),
));
}
let new_version = current_version_stmt.column_int(0)?;
let new_version = current_version_stmt.column_int(0);
if new_version >= current_version {
// Database down from version $currentVersion to $version failed - version not updated after dow migration
return Err(SQLiteError(
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ FROM json_each(?) e",
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
bucket_statement.step()?;

let bucket_id = bucket_statement.column_int64(0)?;
let bucket_id = bucket_statement.column_int64(0);

// This is an optimization for initial sync - we can avoid persisting individual REMOVE
// operations when last_applied_op = 0.
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
// operation can supersede another PUT operation we're syncing at the same time.
let mut is_empty = bucket_statement.column_int64(1)? == 0;
let mut is_empty = bucket_statement.column_int64(1) == 0;

// Statement to supersede (replace) operations with the same key.
// language=SQLite
Expand Down Expand Up @@ -106,11 +106,11 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
let mut added_ops: i32 = 0;

while iterate_statement.step()? == ResultCode::ROW {
let op_id = iterate_statement.column_int64(0)?;
let op_id = iterate_statement.column_int64(0);
let op = iterate_statement.column_text(1)?;
let object_type = iterate_statement.column_text(2);
let object_id = iterate_statement.column_text(3);
let checksum = iterate_statement.column_int(4)?;
let checksum = iterate_statement.column_int(4);
let op_data = iterate_statement.column_text(5);

last_op = Some(op_id);
Expand All @@ -131,7 +131,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",

while supersede_statement.step()? == ResultCode::ROW {
// Superseded (deleted) a previous operation, add the checksum
let supersede_checksum = supersede_statement.column_int(1)?;
let supersede_checksum = supersede_statement.column_int(1);
add_checksum = add_checksum.wrapping_add(supersede_checksum);
op_checksum = op_checksum.wrapping_sub(supersede_checksum);

Expand Down Expand Up @@ -272,7 +272,7 @@ pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteE
statement.bind_text(1, name, sqlite::Destructor::STATIC)?;

if statement.step()? == ResultCode::ROW {
let bucket_id = statement.column_int64(0)?;
let bucket_id = statement.column_int64(0);

// language=SQLite
let updated_statement = db.prepare_v2(
Expand Down
Loading