@@ -79,15 +79,16 @@ pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, S
79
79
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
80
80
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
81
81
WITH updated_rows AS (
82
- SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
82
+ SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
83
83
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
84
- WHERE buckets.priority <= ?
85
- UNION SELECT row_type, row_id FROM ps_updated_rows
84
+ WHERE buckets.priority <= ?1
85
+ UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
86
86
)
87
87
88
88
-- 3. Group the objects from different buckets together into a single one (ops).
89
89
SELECT b.row_type as type,
90
90
b.row_id as id,
91
+ b.local as local,
91
92
r.data as data,
92
93
count(r.bucket) as buckets,
93
94
/* max() affects which row is used for 'data' */
@@ -97,6 +98,7 @@ FROM updated_rows b
97
98
LEFT OUTER JOIN ps_oplog AS r
98
99
ON r.row_type = b.row_type
99
100
AND r.row_id = b.row_id
101
+ AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1
100
102
-- Group for (3)
101
103
GROUP BY b.row_type, b.row_id" ,
102
104
)
@@ -108,11 +110,18 @@ GROUP BY b.row_type, b.row_id",
108
110
while statement. step ( ) . into_db_result ( db) ? == ResultCode :: ROW {
109
111
let type_name = statement. column_text ( 0 ) ?;
110
112
let id = statement. column_text ( 1 ) ?;
111
- let buckets = statement. column_int ( 3 ) ?;
112
- let data = statement. column_text ( 2 ) ;
113
+ let local = statement. column_int ( 2 ) ? == 1 ;
114
+ let buckets = statement. column_int ( 4 ) ?;
115
+ let data = statement. column_text ( 3 ) ;
113
116
114
117
let table_name = internal_table_name ( type_name) ;
115
118
119
+ if local && buckets == 0 && priority == BucketPriority :: HIGHEST {
120
+ // These rows are still local and they haven't been uploaded yet (which we allow for
121
+ // buckets with priority=0 completing). We should just keep them around.
122
+ continue ;
123
+ }
124
+
116
125
if tables. contains ( & table_name) {
117
126
let quoted = quote_internal_name ( type_name, false ) ;
118
127
@@ -157,20 +166,27 @@ GROUP BY b.row_type, b.row_id",
157
166
}
158
167
159
168
// language=SQLite
160
- db. exec_safe (
161
- "UPDATE ps_buckets
169
+ let updated = db
170
+ . prepare_v2 (
171
+ "UPDATE ps_buckets
162
172
SET last_applied_op = last_op
163
- WHERE last_applied_op != last_op" ,
164
- )
165
- . into_db_result ( db) ?;
166
-
167
- // language=SQLite
168
- db. exec_safe ( "DELETE FROM ps_updated_rows" )
173
+ WHERE last_applied_op != last_op AND priority <= ?" ,
174
+ )
169
175
. into_db_result ( db) ?;
176
+ updated. bind_int ( 1 , priority. into ( ) ) ?;
177
+ updated. exec ( ) ?;
170
178
171
- // language=SQLite
172
- db. exec_safe ( "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())" )
179
+ if priority == BucketPriority :: LOWEST {
180
+ // language=SQLite
181
+ db. exec_safe ( "DELETE FROM ps_updated_rows" )
182
+ . into_db_result ( db) ?;
183
+
184
+ // language=SQLite
185
+ db. exec_safe (
186
+ "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())" ,
187
+ )
173
188
. into_db_result ( db) ?;
189
+ }
174
190
175
191
Ok ( 1 )
176
192
}
0 commit comments