Skip to content

Commit 1e76045

Browse files
authored
Merge pull request #91 from powersync-ja/crud-vtab-only-update-local-once
Crud vtab optimizations
2 parents 5253af8 + 0583385 commit 1e76045

File tree

2 files changed

+131
-41
lines changed

2 files changed

+131
-41
lines changed

crates/core/src/crud_vtab.rs

Lines changed: 86 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,20 @@ struct ActiveCrudTransaction {
4949
}
5050

5151
enum CrudTransactionMode {
52-
Manual {
53-
stmt: ManagedStmt,
54-
},
55-
Simple {
56-
stmt: ManagedStmt,
57-
set_updated_rows: ManagedStmt,
58-
update_local_bucket: ManagedStmt,
59-
},
52+
Manual(ManualCrudTransactionMode),
53+
Simple(SimpleCrudTransactionMode),
54+
}
55+
56+
#[derive(Default)]
57+
struct ManualCrudTransactionMode {
58+
stmt: Option<ManagedStmt>,
59+
}
60+
61+
#[derive(Default)]
62+
struct SimpleCrudTransactionMode {
63+
stmt: Option<ManagedStmt>,
64+
set_updated_rows: Option<ManagedStmt>,
65+
had_writes: bool,
6066
}
6167

6268
impl VirtualTable {
@@ -73,31 +79,29 @@ impl VirtualTable {
7379
}
7480
}
7581

76-
fn handle_insert(&self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
82+
fn handle_insert(&mut self, args: &[*mut sqlite::value]) -> Result<(), SQLiteError> {
7783
let current_tx = self
7884
.current_tx
79-
.as_ref()
85+
.as_mut()
8086
.ok_or_else(|| SQLiteError(ResultCode::MISUSE, Some(String::from("No tx_id"))))?;
87+
let db = self.db;
8188

82-
match &current_tx.mode {
83-
CrudTransactionMode::Manual { stmt } => {
89+
match &mut current_tx.mode {
90+
CrudTransactionMode::Manual(manual) => {
8491
// Columns are (data TEXT, options INT HIDDEN)
8592
let data = args[0].text();
8693
let flags = match args[1].value_type() {
8794
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
8895
_ => TableInfoFlags(args[1].int() as u32),
8996
};
9097

98+
let stmt = manual.raw_crud_statement(db)?;
9199
stmt.bind_int64(1, current_tx.tx_id)?;
92100
stmt.bind_text(2, data, sqlite::Destructor::STATIC)?;
93101
stmt.bind_int(3, flags.0 as i32)?;
94102
stmt.exec()?;
95103
}
96-
CrudTransactionMode::Simple {
97-
stmt,
98-
set_updated_rows,
99-
update_local_bucket,
100-
} => {
104+
CrudTransactionMode::Simple(simple) => {
101105
// Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN)
102106
let flags = match args[6].value_type() {
103107
sqlite_nostd::ColumnType::Null => TableInfoFlags::default(),
@@ -133,6 +137,7 @@ impl VirtualTable {
133137

134138
// First, we insert into ps_crud like the manual vtab would too. We have to create
135139
// the JSON out of the individual components for that.
140+
let stmt = simple.raw_crud_statement(db)?;
136141
stmt.bind_int64(1, current_tx.tx_id)?;
137142

138143
let serialized = serde_json::to_string(&CrudEntry {
@@ -151,10 +156,11 @@ impl VirtualTable {
151156
stmt.exec()?;
152157

153158
// However, we also set ps_updated_rows and update the $local bucket
159+
let set_updated_rows = simple.set_updated_rows_statement(db)?;
154160
set_updated_rows.bind_text(1, row_type, sqlite::Destructor::STATIC)?;
155161
set_updated_rows.bind_text(2, id, sqlite::Destructor::STATIC)?;
156162
set_updated_rows.exec()?;
157-
update_local_bucket.exec()?;
163+
simple.record_local_write(db)?;
158164
}
159165
}
160166

@@ -176,39 +182,78 @@ impl VirtualTable {
176182
self.current_tx = Some(ActiveCrudTransaction {
177183
tx_id,
178184
mode: if self.is_simple {
179-
CrudTransactionMode::Simple {
180-
// language=SQLite
181-
stmt: db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)?,
182-
// language=SQLite
183-
set_updated_rows: db.prepare_v3(
184-
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
185-
0,
186-
)?,
187-
update_local_bucket: db.prepare_v3(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"), 0)?,
188-
}
185+
CrudTransactionMode::Simple(Default::default())
189186
} else {
190-
const SQL: &str = formatcp!(
191-
"\
187+
CrudTransactionMode::Manual(Default::default())
188+
},
189+
});
190+
191+
Ok(())
192+
}
193+
194+
fn end_transaction(&mut self) {
195+
self.current_tx = None;
196+
}
197+
}
198+
199+
impl ManualCrudTransactionMode {
200+
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
201+
prepare_lazy(&mut self.stmt, || {
202+
const SQL: &str = formatcp!(
203+
"\
192204
WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
193205
INSERT INTO ps_crud(tx_id, data)
194206
SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
195207
",
196-
TableInfoFlags::IGNORE_EMPTY_UPDATE
197-
);
208+
TableInfoFlags::IGNORE_EMPTY_UPDATE
209+
);
198210

199-
let insert_statement = db.prepare_v3(SQL, 0)?;
200-
CrudTransactionMode::Manual {
201-
stmt: insert_statement,
202-
}
203-
},
204-
});
211+
db.prepare_v3(SQL, 0)
212+
})
213+
}
214+
}
215+
216+
impl SimpleCrudTransactionMode {
217+
fn raw_crud_statement(&mut self, db: *mut sqlite::sqlite3) -> Result<&ManagedStmt, ResultCode> {
218+
prepare_lazy(&mut self.stmt, || {
219+
// language=SQLite
220+
db.prepare_v3("INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)", 0)
221+
})
222+
}
223+
224+
fn set_updated_rows_statement(
225+
&mut self,
226+
db: *mut sqlite::sqlite3,
227+
) -> Result<&ManagedStmt, ResultCode> {
228+
prepare_lazy(&mut self.set_updated_rows, || {
229+
// language=SQLite
230+
db.prepare_v3(
231+
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)",
232+
0,
233+
)
234+
})
235+
}
236+
237+
fn record_local_write(&mut self, db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
238+
if !self.had_writes {
239+
db.exec_safe(formatcp!("INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})"))?;
240+
self.had_writes = true;
241+
}
205242

206243
Ok(())
207244
}
245+
}
208246

209-
fn end_transaction(&mut self) {
210-
self.current_tx = None;
247+
/// A variant of `Option.get_or_insert` that handles insertions returning errors.
248+
fn prepare_lazy(
249+
stmt: &mut Option<ManagedStmt>,
250+
prepare: impl FnOnce() -> Result<ManagedStmt, ResultCode>,
251+
) -> Result<&ManagedStmt, ResultCode> {
252+
if let None = stmt {
253+
*stmt = Some(prepare()?);
211254
}
255+
256+
return Ok(unsafe { stmt.as_ref().unwrap_unchecked() });
212257
}
213258

214259
extern "C" fn connect(
@@ -295,7 +340,7 @@ extern "C" fn update(
295340
ResultCode::MISUSE as c_int
296341
} else if rowid.value_type() == sqlite::ColumnType::Null {
297342
// INSERT
298-
let tab = unsafe { &*(vtab.cast::<VirtualTable>()) };
343+
let tab = unsafe { &mut *(vtab.cast::<VirtualTable>()) };
299344
let result = tab.handle_insert(&args[2..]);
300345
vtab_result(vtab, result)
301346
} else {

dart/test/crud_test.dart

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,51 @@ void main() {
322322
'{"op":"PUT","id":"foo","type":"users","data":{"my":"value"},"old":{"previous":"value"}}',
323323
});
324324
});
325+
326+
test('resets state after commit', () {
327+
db.execute('BEGIN');
328+
db.execute(
329+
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
330+
'DELETE',
331+
'foo',
332+
'users',
333+
]);
334+
db.execute('commit');
335+
336+
db.execute(
337+
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
338+
'DELETE',
339+
'foo',
340+
'users',
341+
]);
342+
expect(db.select('SELECT * FROM ps_crud').map((r) => r['tx_id']),
343+
[1, 2]);
344+
});
345+
346+
test('resets state after rollback', () {
347+
db.execute('BEGIN');
348+
db.execute(
349+
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
350+
'DELETE',
351+
'foo',
352+
'users',
353+
]);
354+
db.execute('rollback');
355+
356+
db.execute(
357+
'INSERT INTO powersync_crud (op, id, type) VALUES (?, ?, ?)', [
358+
'DELETE',
359+
'foo2',
360+
'users',
361+
]);
362+
expect(db.select('SELECT * FROM ps_crud'), [
363+
{
364+
'id': 1,
365+
'data': '{"op":"DELETE","id":"foo2","type":"users"}',
366+
'tx_id': 1,
367+
}
368+
]);
369+
});
325370
});
326371
});
327372

0 commit comments

Comments
 (0)