1
1
use alloc:: format;
2
2
use alloc:: string:: String ;
3
+ use alloc:: vec:: Vec ;
3
4
use num_traits:: Zero ;
5
+ use serde:: Deserialize ;
4
6
5
7
use crate :: error:: { PSResult , SQLiteError } ;
8
+ use crate :: sync:: line:: DataLine ;
9
+ use crate :: sync:: operations:: insert_bucket_operations;
6
10
use crate :: sync:: Checksum ;
7
11
use sqlite_nostd as sqlite;
8
12
use sqlite_nostd:: { Connection , ResultCode } ;
@@ -11,246 +15,15 @@ use crate::ext::SafeManagedStmt;
11
15
12
16
// Run inside a transaction
13
17
pub fn insert_operation ( db : * mut sqlite:: sqlite3 , data : & str ) -> Result < ( ) , SQLiteError > {
14
- // language=SQLite
15
- let statement = db. prepare_v2 (
16
- "\
17
- SELECT
18
- json_extract(e.value, '$.bucket') as bucket,
19
- json_extract(e.value, '$.data') as data,
20
- json_extract(e.value, '$.has_more') as has_more,
21
- json_extract(e.value, '$.after') as after,
22
- json_extract(e.value, '$.next_after') as next_after
23
- FROM json_each(json_extract(?1, '$.buckets')) e" ,
24
- ) ?;
25
- statement. bind_text ( 1 , data, sqlite:: Destructor :: STATIC ) ?;
26
-
27
- while statement. step ( ) ? == ResultCode :: ROW {
28
- let bucket = statement. column_text ( 0 ) ?;
29
- let data = statement. column_text ( 1 ) ?;
30
- // let _has_more = statement.column_int(2)? != 0;
31
- // let _after = statement.column_text(3)?;
32
- // let _next_after = statement.column_text(4)?;
33
-
34
- insert_bucket_operations ( db, bucket, data) ?;
35
- }
36
-
37
- Ok ( ( ) )
38
- }
39
-
40
- pub fn insert_bucket_operations (
41
- db : * mut sqlite:: sqlite3 ,
42
- bucket : & str ,
43
- data : & str ,
44
- ) -> Result < ( ) , SQLiteError > {
45
- // Statement to insert new operations (only for PUT and REMOVE).
46
- // language=SQLite
47
- let iterate_statement = db. prepare_v2 (
48
- "\
49
- SELECT
50
- json_extract(e.value, '$.op_id') as op_id,
51
- json_extract(e.value, '$.op') as op,
52
- json_extract(e.value, '$.object_type') as object_type,
53
- json_extract(e.value, '$.object_id') as object_id,
54
- json_extract(e.value, '$.checksum') as checksum,
55
- json_extract(e.value, '$.data') as data,
56
- json_extract(e.value, '$.subkey') as subkey
57
- FROM json_each(?) e" ,
58
- ) ?;
59
- iterate_statement. bind_text ( 1 , data, sqlite:: Destructor :: STATIC ) ?;
60
-
61
- // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
62
- // We can consider splitting this into separate SELECT and INSERT statements.
63
- // language=SQLite
64
- let bucket_statement = db. prepare_v2 (
65
- "INSERT INTO ps_buckets(name)
66
- VALUES(?)
67
- ON CONFLICT DO UPDATE
68
- SET last_applied_op = last_applied_op
69
- RETURNING id, last_applied_op" ,
70
- ) ?;
71
- bucket_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
72
- bucket_statement. step ( ) ?;
73
-
74
- let bucket_id = bucket_statement. column_int64 ( 0 ) ;
75
-
76
- // This is an optimization for initial sync - we can avoid persisting individual REMOVE
77
- // operations when last_applied_op = 0.
78
- // We do still need to do the "supersede_statement" step for this case, since a REMOVE
79
- // operation can supersede another PUT operation we're syncing at the same time.
80
- let mut is_empty = bucket_statement. column_int64 ( 1 ) == 0 ;
81
-
82
- // Statement to supersede (replace) operations with the same key.
83
- // language=SQLite
84
- let supersede_statement = db. prepare_v2 (
85
- "\
86
- DELETE FROM ps_oplog
87
- WHERE unlikely(ps_oplog.bucket = ?1)
88
- AND ps_oplog.key = ?2
89
- RETURNING op_id, hash" ,
90
- ) ?;
91
- supersede_statement. bind_int64 ( 1 , bucket_id) ?;
92
-
93
- // language=SQLite
94
- let insert_statement = db. prepare_v2 ( "\
95
- INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)") ?;
96
- insert_statement. bind_int64 ( 1 , bucket_id) ?;
97
-
98
- let updated_row_statement = db. prepare_v2 (
99
- "\
100
- INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
101
- ) ?;
102
-
103
- bucket_statement. reset ( ) ?;
104
-
105
- let mut last_op: Option < i64 > = None ;
106
- let mut add_checksum = Checksum :: zero ( ) ;
107
- let mut op_checksum = Checksum :: zero ( ) ;
108
- let mut added_ops: i32 = 0 ;
109
-
110
- while iterate_statement. step ( ) ? == ResultCode :: ROW {
111
- let op_id = iterate_statement. column_int64 ( 0 ) ;
112
- let op = iterate_statement. column_text ( 1 ) ?;
113
- let object_type = iterate_statement. column_text ( 2 ) ;
114
- let object_id = iterate_statement. column_text ( 3 ) ;
115
- let checksum = Checksum :: from_i32 ( iterate_statement. column_int ( 4 ) ) ;
116
- let op_data = iterate_statement. column_text ( 5 ) ;
117
-
118
- last_op = Some ( op_id) ;
119
- added_ops += 1 ;
120
-
121
- if op == "PUT" || op == "REMOVE" {
122
- let key: String ;
123
- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type. as_ref ( ) , object_id. as_ref ( ) ) {
124
- let subkey = iterate_statement. column_text ( 6 ) . unwrap_or ( "null" ) ;
125
- key = format ! ( "{}/{}/{}" , & object_type, & object_id, subkey) ;
126
- } else {
127
- key = String :: from ( "" ) ;
128
- }
129
-
130
- supersede_statement. bind_text ( 2 , & key, sqlite:: Destructor :: STATIC ) ?;
131
-
132
- let mut superseded = false ;
133
-
134
- while supersede_statement. step ( ) ? == ResultCode :: ROW {
135
- // Superseded (deleted) a previous operation, add the checksum
136
- let supersede_checksum = Checksum :: from_i32 ( supersede_statement. column_int ( 1 ) ) ;
137
- add_checksum += supersede_checksum;
138
- op_checksum -= supersede_checksum;
139
-
140
- // Superseded an operation, only skip if the bucket was empty
141
- // Previously this checked "superseded_op <= last_applied_op".
142
- // However, that would not account for a case where a previous
143
- // PUT operation superseded the original PUT operation in this
144
- // same batch, in which case superseded_op is not accurate for this.
145
- if !is_empty {
146
- superseded = true ;
147
- }
148
- }
149
- supersede_statement. reset ( ) ?;
150
-
151
- if op == "REMOVE" {
152
- let should_skip_remove = !superseded;
153
-
154
- add_checksum += checksum;
155
-
156
- if !should_skip_remove {
157
- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
158
- updated_row_statement. bind_text (
159
- 1 ,
160
- object_type,
161
- sqlite:: Destructor :: STATIC ,
162
- ) ?;
163
- updated_row_statement. bind_text (
164
- 2 ,
165
- object_id,
166
- sqlite:: Destructor :: STATIC ,
167
- ) ?;
168
- updated_row_statement. exec ( ) ?;
169
- }
170
- }
171
-
172
- continue ;
173
- }
174
-
175
- insert_statement. bind_int64 ( 2 , op_id) ?;
176
- if key != "" {
177
- insert_statement. bind_text ( 3 , & key, sqlite:: Destructor :: STATIC ) ?;
178
- } else {
179
- insert_statement. bind_null ( 3 ) ?;
180
- }
181
-
182
- if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
183
- insert_statement. bind_text ( 4 , object_type, sqlite:: Destructor :: STATIC ) ?;
184
- insert_statement. bind_text ( 5 , object_id, sqlite:: Destructor :: STATIC ) ?;
185
- } else {
186
- insert_statement. bind_null ( 4 ) ?;
187
- insert_statement. bind_null ( 5 ) ?;
188
- }
189
- if let Ok ( data) = op_data {
190
- insert_statement. bind_text ( 6 , data, sqlite:: Destructor :: STATIC ) ?;
191
- } else {
192
- insert_statement. bind_null ( 6 ) ?;
193
- }
194
-
195
- insert_statement. bind_int ( 7 , checksum. bitcast_i32 ( ) ) ?;
196
- insert_statement. exec ( ) ?;
197
-
198
- op_checksum += checksum;
199
- } else if op == "MOVE" {
200
- add_checksum += checksum;
201
- } else if op == "CLEAR" {
202
- // Any remaining PUT operations should get an implicit REMOVE
203
- // language=SQLite
204
- let clear_statement1 = db
205
- . prepare_v2 (
206
- "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
207
- SELECT row_type, row_id
208
- FROM ps_oplog
209
- WHERE bucket = ?1" ,
210
- )
211
- . into_db_result ( db) ?;
212
- clear_statement1. bind_int64 ( 1 , bucket_id) ?;
213
- clear_statement1. exec ( ) ?;
214
-
215
- let clear_statement2 = db
216
- . prepare_v2 ( "DELETE FROM ps_oplog WHERE bucket = ?1" )
217
- . into_db_result ( db) ?;
218
- clear_statement2. bind_int64 ( 1 , bucket_id) ?;
219
- clear_statement2. exec ( ) ?;
220
-
221
- // And we need to re-apply all of those.
222
- // We also replace the checksum with the checksum of the CLEAR op.
223
- // language=SQLite
224
- let clear_statement2 = db. prepare_v2 (
225
- "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2" ,
226
- ) ?;
227
- clear_statement2. bind_int64 ( 2 , bucket_id) ?;
228
- clear_statement2. bind_int ( 1 , checksum. bitcast_i32 ( ) ) ?;
229
- clear_statement2. exec ( ) ?;
230
-
231
- add_checksum = Checksum :: zero ( ) ;
232
- is_empty = true ;
233
- op_checksum = Checksum :: zero ( ) ;
234
- }
18
+ #[ derive( Deserialize ) ]
19
+ struct BucketBatch < ' a > {
20
+ #[ serde( borrow) ]
21
+ buckets : Vec < DataLine < ' a > > ,
235
22
}
236
23
237
- if let Some ( last_op) = & last_op {
238
- // language=SQLite
239
- let statement = db. prepare_v2 (
240
- "UPDATE ps_buckets
241
- SET last_op = ?2,
242
- add_checksum = (add_checksum + ?3) & 0xffffffff,
243
- op_checksum = (op_checksum + ?4) & 0xffffffff,
244
- count_since_last = count_since_last + ?5
245
- WHERE id = ?1" ,
246
- ) ?;
247
- statement. bind_int64 ( 1 , bucket_id) ?;
248
- statement. bind_int64 ( 2 , * last_op) ?;
249
- statement. bind_int ( 3 , add_checksum. bitcast_i32 ( ) ) ?;
250
- statement. bind_int ( 4 , op_checksum. bitcast_i32 ( ) ) ?;
251
- statement. bind_int ( 5 , added_ops) ?;
252
-
253
- statement. exec ( ) ?;
24
+ let batch: BucketBatch = serde_json:: from_str ( data) ?;
25
+ for line in & batch. buckets {
26
+ insert_bucket_operations ( db, line) ?;
254
27
}
255
28
256
29
Ok ( ( ) )
0 commit comments