Skip to content

Commit 8de98da

Browse files
authored
Merge pull request #90 from powersync-ja/fix-trailing-statements
Reset cached statements after using them
2 parents 1e76045 + fdda3a2 commit 8de98da

File tree

6 files changed

+37
-53
lines changed

6 files changed

+37
-53
lines changed

Cargo.lock

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ serde = { version = "1.0", default-features = false, features = ["alloc", "deriv
2222
const_format = "0.2.34"
2323
futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] }
2424
rustc-hash = { version = "2.1", default-features = false }
25-
streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] }
2625

2726
[dependencies.uuid]
2827
version = "1.4.1"

crates/core/src/sync/storage_adapter.rs

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use core::{assert_matches::debug_assert_matches, fmt::Display};
33
use alloc::{string::ToString, vec::Vec};
44
use serde::Serialize;
55
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
6-
use streaming_iterator::StreamingIterator;
76

87
use crate::{
98
error::SQLiteError,
@@ -25,7 +24,7 @@ use super::{
2524
/// used frequently as an optimization, but we're not taking advantage of that yet.
2625
pub struct StorageAdapter {
2726
pub db: *mut sqlite::sqlite3,
28-
progress_stmt: ManagedStmt,
27+
pub progress_stmt: ManagedStmt,
2928
time_stmt: ManagedStmt,
3029
}
3130

@@ -78,37 +77,22 @@ impl StorageAdapter {
7877
Ok(())
7978
}
8079

81-
pub fn local_progress(
82-
&self,
83-
) -> Result<
84-
impl StreamingIterator<Item = Result<PersistedBucketProgress, ResultCode>>,
85-
ResultCode,
86-
> {
87-
self.progress_stmt.reset()?;
88-
89-
fn step(stmt: &ManagedStmt) -> Result<Option<PersistedBucketProgress>, ResultCode> {
90-
if stmt.step()? == ResultCode::ROW {
91-
let bucket = stmt.column_text(0)?;
92-
let count_at_last = stmt.column_int64(1);
93-
let count_since_last = stmt.column_int64(2);
94-
95-
return Ok(Some(PersistedBucketProgress {
96-
bucket,
97-
count_at_last,
98-
count_since_last,
99-
}));
100-
}
101-
80+
pub fn step_progress(&self) -> Result<Option<PersistedBucketProgress>, ResultCode> {
81+
if self.progress_stmt.step()? == ResultCode::ROW {
82+
let bucket = self.progress_stmt.column_text(0)?;
83+
let count_at_last = self.progress_stmt.column_int64(1);
84+
let count_since_last = self.progress_stmt.column_int64(2);
85+
86+
Ok(Some(PersistedBucketProgress {
87+
bucket,
88+
count_at_last,
89+
count_since_last,
90+
}))
91+
} else {
92+
// Done
93+
self.progress_stmt.reset()?;
10294
Ok(None)
10395
}
104-
105-
Ok(streaming_iterator::from_fn(|| {
106-
match step(&self.progress_stmt) {
107-
Err(e) => Some(Err(e)),
108-
Ok(Some(other)) => Some(Ok(other)),
109-
Ok(None) => None,
110-
}
111-
}))
11296
}
11397

11498
pub fn reset_progress(&self) -> Result<(), ResultCode> {
@@ -239,10 +223,11 @@ impl StorageAdapter {
239223
}
240224

241225
pub fn now(&self) -> Result<Timestamp, ResultCode> {
242-
self.time_stmt.reset()?;
243226
self.time_stmt.step()?;
227+
let res = Timestamp(self.time_stmt.column_int64(0));
228+
self.time_stmt.reset()?;
244229

245-
Ok(Timestamp(self.time_stmt.column_int64(0)))
230+
Ok(res)
246231
}
247232
}
248233

crates/core/src/sync/streaming_sync.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,11 +425,10 @@ impl StreamingSyncIteration {
425425
&self,
426426
checkpoint: &OwnedCheckpoint,
427427
) -> Result<SyncDownloadProgress, SQLiteError> {
428-
let local_progress = self.adapter.local_progress()?;
429428
let SyncProgressFromCheckpoint {
430429
progress,
431430
needs_counter_reset,
432-
} = SyncDownloadProgress::for_checkpoint(checkpoint, local_progress)?;
431+
} = SyncDownloadProgress::for_checkpoint(checkpoint, &self.adapter)?;
433432

434433
if needs_counter_reset {
435434
self.adapter.reset_progress()?;

crates/core/src/sync/sync_status.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ use core::{cell::RefCell, hash::BuildHasher};
33
use rustc_hash::FxBuildHasher;
44
use serde::Serialize;
55
use sqlite_nostd::ResultCode;
6-
use streaming_iterator::StreamingIterator;
6+
7+
use crate::sync::storage_adapter::StorageAdapter;
78

89
use super::{
910
bucket_priority::BucketPriority, interface::Instruction, line::DataLine,
10-
storage_adapter::PersistedBucketProgress, streaming_sync::OwnedCheckpoint,
11+
streaming_sync::OwnedCheckpoint,
1112
};
1213

1314
/// Information about a progressing download.
@@ -187,9 +188,7 @@ pub struct SyncProgressFromCheckpoint {
187188
impl SyncDownloadProgress {
188189
pub fn for_checkpoint<'a>(
189190
checkpoint: &OwnedCheckpoint,
190-
mut local_progress: impl StreamingIterator<
191-
Item = Result<PersistedBucketProgress<'a>, ResultCode>,
192-
>,
191+
adapter: &StorageAdapter,
193192
) -> Result<SyncProgressFromCheckpoint, ResultCode> {
194193
let mut buckets = BTreeMap::<String, BucketProgress>::new();
195194
let mut needs_reset = false;
@@ -206,12 +205,11 @@ impl SyncDownloadProgress {
206205
);
207206
}
208207

209-
while let Some(row) = local_progress.next() {
210-
let row = match row {
211-
Ok(row) => row,
212-
Err(e) => return Err(*e),
213-
};
208+
// Go through local bucket states to detect pending progress from previous sync iterations
209+
// that may have been interrupted.
210+
adapter.progress_stmt.reset()?;
214211

212+
while let Some(row) = adapter.step_progress()? {
215213
let Some(progress) = buckets.get_mut(row.bucket) else {
216214
continue;
217215
};
@@ -232,6 +230,8 @@ impl SyncDownloadProgress {
232230
}
233231
}
234232

233+
adapter.progress_stmt.reset()?;
234+
235235
Ok(SyncProgressFromCheckpoint {
236236
progress: Self { buckets },
237237
needs_counter_reset: needs_reset,

dart/test/sync_test.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ void _syncTests<T>({
5252
db.execute('begin');
5353
final [row] =
5454
db.select('SELECT powersync_control(?, ?)', [operation, data]);
55+
56+
// Make sure that powersync_control doesn't leave any busy statements
57+
// behind.
58+
// TODO: Re-enable after we can guarantee sqlite_stmt being available
59+
// const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;';
60+
// final busy = db.select(statement, [statement]);
61+
// expect(busy, isEmpty);
62+
5563
db.execute('commit');
5664
return jsonDecode(row.columnAt(0));
5765
}

0 commit comments

Comments
 (0)