@@ -3,23 +3,33 @@ extern crate alloc;
33use alloc:: boxed:: Box ;
44use alloc:: string:: String ;
55use const_format:: formatcp;
6- use core:: ffi:: { c_char, c_int, c_void} ;
6+ use core:: ffi:: { c_char, c_int, c_void, CStr } ;
7+ use core:: ptr:: null_mut;
8+ use serde:: Serialize ;
9+ use serde_json:: value:: RawValue ;
710
811use sqlite:: { Connection , ResultCode , Value } ;
9- use sqlite_nostd as sqlite;
1012use sqlite_nostd:: ManagedStmt ;
11- use sqlite_nostd:: ResultCode :: NULL ;
13+ use sqlite_nostd:: { self as sqlite , ColumnType } ;
1214
1315use crate :: error:: SQLiteError ;
1416use crate :: ext:: SafeManagedStmt ;
1517use crate :: schema:: TableInfoFlags ;
18+ use crate :: util:: MAX_OP_ID ;
1619use crate :: vtab_util:: * ;
1720
21+ const MANUAL_NAME : & CStr = c"powersync_crud_" ;
22+ const SIMPLE_NAME : & CStr = c"powersync_crud" ;
23+
1824// Structure:
1925// CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);
26+ // CREATE TABLE powersync_crud(op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN);
2027//
2128// This is a insert-only virtual table. It generates transaction ids in ps_tx, and inserts data in
2229// ps_crud(tx_id, data).
30+ // The second form (without the trailing underscore) takes the data to insert as individual
31+ // components and constructs the data to insert into `ps_crud` internally. It will also update
32+ // `ps_updated_rows` and the `$local` bucket.
2333//
2434// Using a virtual table like this allows us to hook into xBegin, xCommit and xRollback to automatically
2535// increment transaction ids. These are only called when powersync_crud_ is used as part of a transaction,
@@ -29,22 +39,201 @@ use crate::vtab_util::*;
2939struct VirtualTable {
3040 base : sqlite:: vtab ,
3141 db : * mut sqlite:: sqlite3 ,
32- current_tx : Option < i64 > ,
33- insert_statement : Option < ManagedStmt > ,
42+ current_tx : Option < ActiveCrudTransaction > ,
43+ is_simple : bool ,
44+ }
45+
46+ struct ActiveCrudTransaction {
47+ tx_id : i64 ,
48+ mode : CrudTransactionMode ,
49+ }
50+
51+ enum CrudTransactionMode {
52+ Manual {
53+ stmt : ManagedStmt ,
54+ } ,
55+ Simple {
56+ stmt : ManagedStmt ,
57+ set_updated_rows : ManagedStmt ,
58+ update_local_bucket : ManagedStmt ,
59+ } ,
60+ }
61+
62+ impl VirtualTable {
63+ fn value_to_json < ' a > ( value : & ' a * mut sqlite:: value ) -> Option < & ' a RawValue > {
64+ match value. value_type ( ) {
65+ ColumnType :: Text => {
66+ Some ( unsafe {
67+ // Safety: RawValue is a transparent type wrapping a str. We assume that it
68+ // contains valid JSON.
69+ core:: mem:: transmute :: < & ' a str , & ' a RawValue > ( value. text ( ) )
70+ } )
71+ }
72+ _ => None ,
73+ }
74+ }
75+
76+ fn handle_insert ( & self , args : & [ * mut sqlite:: value ] ) -> Result < ( ) , SQLiteError > {
77+ let current_tx = self
78+ . current_tx
79+ . as_ref ( )
80+ . ok_or_else ( || SQLiteError ( ResultCode :: MISUSE , Some ( String :: from ( "No tx_id" ) ) ) ) ?;
81+
82+ match & current_tx. mode {
83+ CrudTransactionMode :: Manual { stmt } => {
84+ // Columns are (data TEXT, options INT HIDDEN)
85+ let data = args[ 0 ] . text ( ) ;
86+ let flags = match args[ 1 ] . value_type ( ) {
87+ sqlite_nostd:: ColumnType :: Null => TableInfoFlags :: default ( ) ,
88+ _ => TableInfoFlags ( args[ 1 ] . int ( ) as u32 ) ,
89+ } ;
90+
91+ stmt. bind_int64 ( 1 , current_tx. tx_id ) ?;
92+ stmt. bind_text ( 2 , data, sqlite:: Destructor :: STATIC ) ?;
93+ stmt. bind_int ( 3 , flags. 0 as i32 ) ?;
94+ stmt. exec ( ) ?;
95+ }
96+ CrudTransactionMode :: Simple {
97+ stmt,
98+ set_updated_rows,
99+ update_local_bucket,
100+ } => {
101+ // Columns are (op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN)
102+ let flags = match args[ 6 ] . value_type ( ) {
103+ sqlite_nostd:: ColumnType :: Null => TableInfoFlags :: default ( ) ,
104+ _ => TableInfoFlags ( args[ 6 ] . int ( ) as u32 ) ,
105+ } ;
106+ let op = args[ 0 ] . text ( ) ;
107+ let id = args[ 1 ] . text ( ) ;
108+ let row_type = args[ 2 ] . text ( ) ;
109+ let metadata = args[ 5 ] ;
110+ let data = Self :: value_to_json ( & args[ 3 ] ) ;
111+
112+ if flags. ignore_empty_update ( )
113+ && op == "PATCH"
114+ && data. map ( |r| r. get ( ) ) == Some ( "{}" )
115+ {
116+ // Ignore this empty update
117+ return Ok ( ( ) ) ;
118+ }
119+
120+ #[ derive( Serialize ) ]
121+ struct CrudEntry < ' a > {
122+ op : & ' a str ,
123+ id : & ' a str ,
124+ #[ serde( rename = "type" ) ]
125+ row_type : & ' a str ,
126+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
127+ data : Option < & ' a RawValue > ,
128+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
129+ old : Option < & ' a RawValue > ,
130+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
131+ metadata : Option < & ' a str > ,
132+ }
133+
134+ // First, we insert into ps_crud like the manual vtab would too. We have to create
135+ // the JSON out of the individual components for that.
136+ stmt. bind_int64 ( 1 , current_tx. tx_id ) ?;
137+
138+ let serialized = serde_json:: to_string ( & CrudEntry {
139+ op,
140+ id,
141+ row_type,
142+ data : data,
143+ old : Self :: value_to_json ( & args[ 4 ] ) ,
144+ metadata : if metadata. value_type ( ) == ColumnType :: Text {
145+ Some ( metadata. text ( ) )
146+ } else {
147+ None
148+ } ,
149+ } ) ?;
150+ stmt. bind_text ( 2 , & serialized, sqlite:: Destructor :: STATIC ) ?;
151+ stmt. exec ( ) ?;
152+
153+ // However, we also set ps_updated_rows and update the $local bucket
154+ set_updated_rows. bind_text ( 1 , row_type, sqlite:: Destructor :: STATIC ) ?;
155+ set_updated_rows. bind_text ( 2 , id, sqlite:: Destructor :: STATIC ) ?;
156+ set_updated_rows. exec ( ) ?;
157+ update_local_bucket. exec ( ) ?;
158+ }
159+ }
160+
161+ Ok ( ( ) )
162+ }
163+
164+ fn begin ( & mut self ) -> Result < ( ) , SQLiteError > {
165+ let db = self . db ;
166+
167+ // language=SQLite
168+ let statement =
169+ db. prepare_v2 ( "UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx" ) ?;
170+ let tx_id = if statement. step ( ) ? == ResultCode :: ROW {
171+ statement. column_int64 ( 0 ) - 1
172+ } else {
173+ return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
174+ } ;
175+
176+ self . current_tx = Some ( ActiveCrudTransaction {
177+ tx_id,
178+ mode : if self . is_simple {
179+ CrudTransactionMode :: Simple {
180+ // language=SQLite
181+ stmt : db. prepare_v3 ( "INSERT INTO ps_crud(tx_id, data) VALUES (?, ?)" , 0 ) ?,
182+ // language=SQLite
183+ set_updated_rows : db. prepare_v3 (
184+ "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?, ?)" ,
185+ 0 ,
186+ ) ?,
187+ update_local_bucket : db. prepare_v3 ( formatcp ! ( "INSERT OR REPLACE INTO ps_buckets(name, last_op, target_op) VALUES('$local', 0, {MAX_OP_ID})" ) , 0 ) ?,
188+ }
189+ } else {
190+ const SQL : & str = formatcp ! (
191+ "\
192+ WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
193+ INSERT INTO ps_crud(tx_id, data)
194+ SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
195+ " ,
196+ TableInfoFlags :: IGNORE_EMPTY_UPDATE
197+ ) ;
198+
199+ let insert_statement = db. prepare_v3 ( SQL , 0 ) ?;
200+ CrudTransactionMode :: Manual {
201+ stmt : insert_statement,
202+ }
203+ } ,
204+ } ) ;
205+
206+ Ok ( ( ) )
207+ }
208+
209+ fn end_transaction ( & mut self ) {
210+ self . current_tx = None ;
211+ }
34212}
35213
36214extern "C" fn connect (
37215 db : * mut sqlite:: sqlite3 ,
38216 _aux : * mut c_void ,
39- _argc : c_int ,
40- _argv : * const * const c_char ,
217+ argc : c_int ,
218+ argv : * const * const c_char ,
41219 vtab : * mut * mut sqlite:: vtab ,
42220 _err : * mut * mut c_char ,
43221) -> c_int {
44- if let Err ( rc) = sqlite:: declare_vtab (
45- db,
46- "CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);" ,
47- ) {
222+ let args = sqlite:: args!( argc, argv) ;
223+ let Some ( name) = args. get ( 0 ) else {
224+ return ResultCode :: MISUSE as c_int ;
225+ } ;
226+
227+ let name = unsafe { CStr :: from_ptr ( * name) } ;
228+ let is_simple = name == SIMPLE_NAME ;
229+
230+ let sql = if is_simple {
231+ "CREATE TABLE powersync_crud(op TEXT, id TEXT, type TEXT, data TEXT, old_values TEXT, metadata TEXT, options INT HIDDEN);"
232+ } else {
233+ "CREATE TABLE powersync_crud_(data TEXT, options INT HIDDEN);"
234+ } ;
235+
236+ if let Err ( rc) = sqlite:: declare_vtab ( db, sql) {
48237 return rc as c_int ;
49238 }
50239
@@ -57,7 +246,7 @@ extern "C" fn connect(
57246 } ,
58247 db,
59248 current_tx : None ,
60- insert_statement : None ,
249+ is_simple ,
61250 } ) ) ;
62251 * vtab = tab. cast :: < sqlite:: vtab > ( ) ;
63252 let _ = sqlite:: vtab_config ( db, 0 ) ;
@@ -72,81 +261,25 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
72261 ResultCode :: OK as c_int
73262}
74263
75- fn begin_impl ( tab : & mut VirtualTable ) -> Result < ( ) , SQLiteError > {
76- let db = tab. db ;
77-
78- const SQL : & str = formatcp ! (
79- "\
80- WITH insertion (tx_id, data) AS (VALUES (?1, ?2))
81- INSERT INTO ps_crud(tx_id, data)
82- SELECT * FROM insertion WHERE (NOT (?3 & {})) OR data->>'op' != 'PATCH' OR data->'data' != '{{}}';
83- " ,
84- TableInfoFlags :: IGNORE_EMPTY_UPDATE
85- ) ;
86-
87- let insert_statement = db. prepare_v3 ( SQL , 0 ) ?;
88- tab. insert_statement = Some ( insert_statement) ;
89-
90- // language=SQLite
91- let statement =
92- db. prepare_v2 ( "UPDATE ps_tx SET next_tx = next_tx + 1 WHERE id = 1 RETURNING next_tx" ) ?;
93- if statement. step ( ) ? == ResultCode :: ROW {
94- let tx_id = statement. column_int64 ( 0 ) - 1 ;
95- tab. current_tx = Some ( tx_id) ;
96- } else {
97- return Err ( SQLiteError :: from ( ResultCode :: ABORT ) ) ;
98- }
99-
100- Ok ( ( ) )
101- }
102-
103264extern "C" fn begin ( vtab : * mut sqlite:: vtab ) -> c_int {
104265 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
105- let result = begin_impl ( tab) ;
266+ let result = tab. begin ( ) ;
106267 vtab_result ( vtab, result)
107268}
108269
109270extern "C" fn commit ( vtab : * mut sqlite:: vtab ) -> c_int {
110271 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
111- tab. current_tx = None ;
112- tab. insert_statement = None ;
272+ tab. end_transaction ( ) ;
113273 ResultCode :: OK as c_int
114274}
115275
116276extern "C" fn rollback ( vtab : * mut sqlite:: vtab ) -> c_int {
117277 let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
118- tab. current_tx = None ;
119- tab. insert_statement = None ;
278+ tab. end_transaction ( ) ;
120279 // ps_tx will be rolled back automatically
121280 ResultCode :: OK as c_int
122281}
123282
124- fn insert_operation (
125- vtab : * mut sqlite:: vtab ,
126- data : & str ,
127- flags : TableInfoFlags ,
128- ) -> Result < ( ) , SQLiteError > {
129- let tab = unsafe { & mut * ( vtab. cast :: < VirtualTable > ( ) ) } ;
130- if tab. current_tx . is_none ( ) {
131- return Err ( SQLiteError (
132- ResultCode :: MISUSE ,
133- Some ( String :: from ( "No tx_id" ) ) ,
134- ) ) ;
135- }
136- let current_tx = tab. current_tx . unwrap ( ) ;
137- // language=SQLite
138- let statement = tab
139- . insert_statement
140- . as_ref ( )
141- . ok_or ( SQLiteError :: from ( NULL ) ) ?;
142- statement. bind_int64 ( 1 , current_tx) ?;
143- statement. bind_text ( 2 , data, sqlite:: Destructor :: STATIC ) ?;
144- statement. bind_int ( 3 , flags. 0 as i32 ) ?;
145- statement. exec ( ) ?;
146-
147- Ok ( ( ) )
148- }
149-
150283extern "C" fn update (
151284 vtab : * mut sqlite:: vtab ,
152285 argc : c_int ,
@@ -162,12 +295,8 @@ extern "C" fn update(
162295 ResultCode :: MISUSE as c_int
163296 } else if rowid. value_type ( ) == sqlite:: ColumnType :: Null {
164297 // INSERT
165- let data = args[ 2 ] . text ( ) ;
166- let flags = match args[ 3 ] . value_type ( ) {
167- sqlite_nostd:: ColumnType :: Null => TableInfoFlags :: default ( ) ,
168- _ => TableInfoFlags ( args[ 3 ] . int ( ) as u32 ) ,
169- } ;
170- let result = insert_operation ( vtab, data, flags) ;
298+ let tab = unsafe { & * ( vtab. cast :: < VirtualTable > ( ) ) } ;
299+ let result = tab. handle_insert ( & args[ 2 ..] ) ;
171300 vtab_result ( vtab, result)
172301 } else {
173302 // UPDATE - not supported
@@ -207,7 +336,20 @@ static MODULE: sqlite_nostd::module = sqlite_nostd::module {
207336} ;
208337
209338pub fn register ( db : * mut sqlite:: sqlite3 ) -> Result < ( ) , ResultCode > {
210- db. create_module_v2 ( "powersync_crud_" , & MODULE , None , None ) ?;
339+ sqlite:: convert_rc ( sqlite:: create_module_v2 (
340+ db,
341+ SIMPLE_NAME . as_ptr ( ) ,
342+ & MODULE ,
343+ null_mut ( ) ,
344+ None ,
345+ ) ) ?;
346+ sqlite:: convert_rc ( sqlite:: create_module_v2 (
347+ db,
348+ MANUAL_NAME . as_ptr ( ) ,
349+ & MODULE ,
350+ null_mut ( ) ,
351+ None ,
352+ ) ) ?;
211353
212354 Ok ( ( ) )
213355}
0 commit comments