@@ -45,6 +45,7 @@ pub fn insert_bucket_operations(
4545 bucket : & str ,
4646 data : & str ,
4747) -> Result < ( ) , SQLiteError > {
48+ // Statement to insert new operations (only for PUT and REMOVE).
4849 // language=SQLite
4950 let iterate_statement = db. prepare_v2 (
5051 "\
@@ -60,6 +61,7 @@ FROM json_each(?) e",
6061 ) ?;
6162 iterate_statement. bind_text ( 1 , data, sqlite:: Destructor :: STATIC ) ?;
6263
64+ // Statement to supersede (replace) operations with the same key.
6365 // language=SQLite
6466 let supersede_statement = db. prepare_v2 (
6567 "\
@@ -76,13 +78,15 @@ RETURNING op_id, hash",
7678 INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)") ?;
7779 insert_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
7880
81+ // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
82+ // We can consider splitting this into separate SELECT and INSERT statements.
7983 // language=SQLite
8084 let bucket_statement = db. prepare_v2 (
8185 "INSERT INTO ps_buckets(name)
8286 VALUES(?)
83- ON CONFLICT DO UPDATE
84- SET last_applied_op = last_applied_op
85- RETURNING last_applied_op" ,
87+ ON CONFLICT DO UPDATE
88+ SET last_applied_op = last_applied_op
89+ RETURNING last_applied_op" ,
8690 ) ?;
8791 bucket_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
8892 bucket_statement. step ( ) ?;
@@ -91,7 +95,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
9195 // operations when last_applied_op = 0.
9296 // We do still need to do the "supersede_statement" step for this case, since a REMOVE
9397 // operation can supersede another PUT operation we're syncing at the same time.
94- let mut is_empty = bucket_statement. column_int64 ( 0 ) ? == 0 ;
98+ let mut last_applied_op = bucket_statement. column_int64 ( 0 ) ?;
9599
96100 bucket_statement. reset ( ) ?;
97101
@@ -121,16 +125,23 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
121125
122126 let mut superseded = false ;
123127
124- while ( supersede_statement. step ( ) ? == ResultCode :: ROW ) {
128+ while supersede_statement. step ( ) ? == ResultCode :: ROW {
125129 // Superseded (deleted) a previous operation, add the checksum
130+ let superseded_op = supersede_statement. column_int64 ( 0 ) ?;
126131 let supersede_checksum = supersede_statement. column_int ( 1 ) ?;
127132 add_checksum = add_checksum. wrapping_add ( supersede_checksum) ;
128- // Superseded an operation, only skip if the bucket was empty
129- superseded = true ;
133+
134+ if superseded_op <= last_applied_op {
135+ // Superseded an operation previously applied - we cannot skip removes
136+ // For initial sync, last_applied_op = 0, so this is always false.
137+ // For subsequent sync, this is only true if the row was previously
138+ // synced, not when it was first synced in the current batch.
139+ superseded = true ;
140+ }
130141 }
131142 supersede_statement. reset ( ) ?;
132143
133- let should_skip_remove = ( is_empty || !superseded) && op == "REMOVE" ;
144+ let should_skip_remove = !superseded && op == "REMOVE" ;
134145 if ( should_skip_remove) {
135146 // If a REMOVE statement did not replace (supersede) any previous
136147 // operations, we do not need to persist it.
@@ -185,7 +196,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
185196 clear_statement2. exec ( ) ?;
186197
187198 add_checksum = 0 ;
188- is_empty = true ;
199+ last_applied_op = 0 ;
189200 }
190201 }
191202
0 commit comments