Skip to content

Commit 756f514

Browse files
committed
Port existing sync tests
1 parent 1a9d97e commit 756f514

File tree

10 files changed

+580
-576
lines changed

10 files changed

+580
-576
lines changed

crates/core/src/bson/de.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ impl<'de, 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
166166
// position should not be parsed, it should be forwarded as an embedded byte array.
167167
if name == Deserializer::SPECIAL_CASE_EMBEDDED_DOCUMENT {
168168
return if matches!(kind, ElementType::Document) {
169-
let object = self.object_reader()?;
170-
visitor.visit_borrowed_bytes(object.parser.remaining())
169+
let object = self.parser.skip_document()?;
170+
visitor.visit_borrowed_bytes(object)
171171
} else {
172172
self.deserialize_any(visitor)
173173
};

crates/core/src/bson/parser.rs

+19
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,25 @@ impl<'de> Parser<'de> {
147147
self.subreader(total_size - 4)
148148
}
149149

150+
/// Skips over a document at the current offset, returning the bytes making up the document.
151+
pub fn skip_document(&mut self) -> Result<&'de [u8], BsonError> {
152+
let Some(peek_size) = self.remaining_input.get(0..4) else {
153+
return Err(self.error(ErrorKind::UnexpectedEoF));
154+
};
155+
156+
let parsed_size = u32::try_from(i32::from_le_bytes(
157+
peek_size.try_into().expect("should have correct length"),
158+
))
159+
.and_then(usize::try_from)
160+
.map_err(|_| self.error(ErrorKind::InvalidSize))?;
161+
162+
if parsed_size < 5 || parsed_size >= self.remaining_input.len() {
163+
return Err(self.error(ErrorKind::InvalidSize))?;
164+
}
165+
166+
Ok(self.subreader(parsed_size)?.remaining())
167+
}
168+
150169
/// If only a single byte is left in the current scope, assert that it is a zero byte.
151170
///
152171
/// Otherwise returns false as we haven't reached the end of a document.

crates/core/src/bson/writer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl BsonWriter {
2626
self.put_entry(ElementType::String, name);
2727

2828
let bytes = value.as_bytes();
29-
self.output.put_i32_le(bytes.len() as i32);
29+
self.output.put_i32_le((bytes.len() + 1) as i32);
3030
self.output.put_slice(bytes);
3131
self.output.push(0);
3232
}

crates/core/src/schema/management.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ SELECT
3737
let local_only = statement.column_int(2) != 0;
3838

3939
db.exec_safe(&format!(
40-
"CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT)",
40+
"CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data ANY) STRICT",
4141
quote_identifier(internal_name)
4242
))
4343
.into_db_result(db)?;

crates/core/src/sync/storage_adapter.rs

+19
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,25 @@ GROUP BY bucket_list.bucket",
271271
if sync_result == 1 {
272272
// TODO: Force compact
273273

274+
if priority.is_none() {
275+
// Reset progress counters. We only do this for a complete sync, as we want a
276+
// download progress to always cover a complete checkpoint instead of resetting for
277+
// partial completions.
278+
let update = self.db.prepare_v2(
279+
"UPDATE ps_buckets SET count_since_last = 0, count_at_last = ? WHERE name = ?",
280+
)?;
281+
282+
for bucket in checkpoint.buckets.values() {
283+
if let Some(count) = bucket.count {
284+
update.bind_int64(1, count)?;
285+
update.bind_text(2, bucket.bucket.as_str(), sqlite::Destructor::STATIC)?;
286+
287+
update.exec()?;
288+
update.reset()?;
289+
}
290+
}
291+
}
292+
274293
Ok(SyncLocalResult::ChangesApplied)
275294
} else {
276295
Ok(SyncLocalResult::PendingLocalChanges)

crates/core/src/sync/streaming_sync.rs

+45-25
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,18 @@ impl SyncClient {
7070
));
7171
};
7272

73-
let done = handle.run(&mut active)?;
74-
if done {
75-
*state = ClientState::Idle;
76-
}
73+
match handle.run(&mut active) {
74+
Err(e) => {
75+
*state = ClientState::Idle;
76+
return Err(e);
77+
}
78+
Ok(done) => {
79+
if done {
80+
active.instructions.push(Instruction::CloseSyncStream);
81+
*state = ClientState::Idle;
82+
}
83+
}
84+
};
7785

7886
Ok(active.instructions)
7987
}
@@ -236,12 +244,12 @@ impl StreamingSyncIteration {
236244
}
237245
SyncLine::CheckpointDiff(diff) => {
238246
let Some(target) = target.target_checkpoint_mut() else {
239-
event.instructions.push(Instruction::LogLine {
240-
severity: LogSeverity::WARNING,
241-
line: "Received checkpoint_diff without previous checkpoint"
242-
.to_string(),
243-
});
244-
break;
247+
return Err(SQLiteError(
248+
ResultCode::ABORT,
249+
Some(
250+
"Received checkpoint_diff without previous checkpoint".to_string(),
251+
),
252+
));
245253
};
246254

247255
target.apply_diff(&diff);
@@ -255,9 +263,15 @@ impl StreamingSyncIteration {
255263
);
256264
}
257265
SyncLine::CheckpointComplete(checkpoint_complete) => {
258-
let target = target
259-
.target_checkpoint()
260-
.expect("should have target checkpoint");
266+
let Some(target) = target.target_checkpoint_mut() else {
267+
return Err(SQLiteError(
268+
ResultCode::ABORT,
269+
Some(
270+
"Received checkpoint complete without previous checkpoint"
271+
.to_string(),
272+
),
273+
));
274+
};
261275
let result = self.adapter.sync_local(target, None)?;
262276

263277
match result {
@@ -272,7 +286,11 @@ impl StreamingSyncIteration {
272286
break;
273287
}
274288
SyncLocalResult::PendingLocalChanges => {
275-
todo!("Await pending uploads and try again")
289+
event.instructions.push(Instruction::LogLine {
290+
severity: LogSeverity::WARNING,
291+
line: format!("TODO: Await pending uploads and try again"),
292+
});
293+
break;
276294
}
277295
SyncLocalResult::ChangesApplied => {
278296
event.instructions.push(Instruction::LogLine {
@@ -290,9 +308,15 @@ impl StreamingSyncIteration {
290308
}
291309
SyncLine::CheckpointPartiallyComplete(complete) => {
292310
let priority = complete.priority;
293-
let target = target
294-
.target_checkpoint()
295-
.expect("should have target checkpoint");
311+
let Some(target) = target.target_checkpoint_mut() else {
312+
return Err(SQLiteError(
313+
ResultCode::ABORT,
314+
Some(
315+
"Received checkpoint complete without previous checkpoint"
316+
.to_string(),
317+
),
318+
));
319+
};
296320
let result = self.adapter.sync_local(target, Some(priority))?;
297321

298322
match result {
@@ -356,7 +380,7 @@ impl StreamingSyncIteration {
356380
.update(|s| s.start_connecting(), &mut event.instructions);
357381

358382
let requests = self.adapter.collect_bucket_requests()?;
359-
let local_bucket_names: Vec<String> = requests.iter().map(|s| s.after.clone()).collect();
383+
let local_bucket_names: Vec<String> = requests.iter().map(|s| s.name.clone()).collect();
360384
let request = StreamingSyncRequest {
361385
buckets: requests,
362386
include_checksum: true,
@@ -372,6 +396,7 @@ impl StreamingSyncIteration {
372396
}
373397
}
374398

399+
#[derive(Debug)]
375400
enum SyncTarget {
376401
/// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is
377402
/// updated for subsequent checkpoint or checkpoint_diff lines.
@@ -413,6 +438,7 @@ impl SyncTarget {
413438
}
414439
}
415440

441+
#[derive(Debug)]
416442
pub struct OwnedCheckpoint {
417443
pub last_op_id: i64,
418444
pub write_checkpoint: Option<i64>,
@@ -446,6 +472,7 @@ impl OwnedCheckpoint {
446472
}
447473
}
448474

475+
#[derive(Debug)]
449476
pub struct OwnedBucketChecksum {
450477
pub bucket: String,
451478
pub checksum: i32,
@@ -461,13 +488,6 @@ impl OwnedBucketChecksum {
461488
Some(prio) => self.priority >= prio,
462489
}
463490
}
464-
465-
fn description(&self) -> BucketDescription {
466-
BucketDescription {
467-
priority: self.priority,
468-
name: self.bucket.clone(),
469-
}
470-
}
471491
}
472492

473493
impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {

crates/core/src/sync_local.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl<'a> SyncOperation<'a> {
115115
let type_name = statement.column_text(0)?;
116116
let id = statement.column_text(1)?;
117117
let buckets = statement.column_int(3);
118-
let data = statement.column_text(2);
118+
let data = statement.column_value(2)?;
119119

120120
let table_name = internal_table_name(type_name);
121121

@@ -137,7 +137,7 @@ impl<'a> SyncOperation<'a> {
137137
.prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted))
138138
.into_db_result(self.db)?;
139139
insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
140-
insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?;
140+
insert_statement.bind_value(2, data)?;
141141
insert_statement.exec()?;
142142
}
143143
} else {
@@ -160,7 +160,7 @@ impl<'a> SyncOperation<'a> {
160160
.into_db_result(self.db)?;
161161
insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
162162
insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
163-
insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?;
163+
insert_statement.bind_value(3, data)?;
164164
insert_statement.exec()?;
165165
}
166166
}

crates/core/src/views.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ fn powersync_view_sql_impl(
4444
column_names_quoted.push(quote_identifier(name));
4545

4646
let foo = format!(
47-
"CAST(powersync_extract(data, {:}) as {:})",
47+
"CAST(iif(typeof(data) == 'blob', powersync_extract(data, {:}), json_extract(data, {:})) as {:})",
48+
quote_string(name),
4849
quote_json_path(name),
4950
type_name
5051
);

0 commit comments

Comments
 (0)