Skip to content

Commit e3996e7

Browse files
CrudBatch hasMore + onChange improvement (#182)
1 parent 664596b commit e3996e7

File tree

5 files changed

+62
-7
lines changed

5 files changed

+62
-7
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## unreleased
4+
5+
* Fixed `CrudBatch` `hasMore` always returning false.
6+
* Added `triggerImmediately` to `onChange` method.
7+
38
## 1.0.0-BETA32
49

510
* Added `onChange` method to the PowerSync client. This allows for observing table changes.

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

+43-2
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,22 @@ class DatabaseTest {
187187
fun testTableChangesUpdates() =
188188
databaseTest {
189189
turbineScope {
190-
val query = database.onChange(tables = setOf("users")).testIn(this)
190+
val query =
191+
database
192+
.onChange(
193+
tables = setOf("users"),
194+
).testIn(this)
191195

192196
database.execute(
193197
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
194198
listOf("Test", "[email protected]"),
195199
)
196200

197-
val changeSet = query.awaitItem()
201+
var changeSet = query.awaitItem()
202+
// The initial result
203+
changeSet.count() shouldBe 0
204+
205+
changeSet = query.awaitItem()
198206
changeSet.count() shouldBe 1
199207
changeSet.contains("users") shouldBe true
200208

@@ -418,4 +426,37 @@ class DatabaseTest {
418426

419427
database.getNextCrudTransaction() shouldBe null
420428
}
429+
430+
@Test
431+
fun testCrudBatch() =
432+
databaseTest {
433+
database.execute(
434+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
435+
listOf("a", "[email protected]"),
436+
)
437+
438+
database.writeTransaction {
439+
it.execute(
440+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
441+
listOf("b", "[email protected]"),
442+
)
443+
it.execute(
444+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
445+
listOf("c", "[email protected]"),
446+
)
447+
}
448+
449+
// Purposely limit to less than the number of available ops
450+
var batch = database.getCrudBatch(2) ?: error("Batch should not be null")
451+
batch.hasMore shouldBe true
452+
batch.crud shouldHaveSize 2
453+
batch.complete(null)
454+
455+
batch = database.getCrudBatch(1000) ?: error("Batch should not be null")
456+
batch.crud shouldHaveSize 1
457+
batch.hasMore shouldBe false
458+
batch.complete(null)
459+
460+
database.getCrudBatch() shouldBe null
461+
}
421462
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,10 @@ internal class PowerSyncDatabaseImpl(
258258
return null
259259
}
260260

261-
val entries =
261+
var entries =
262262
internalDb.getAll(
263263
"SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?",
264-
listOf(limit.toLong()),
264+
listOf(limit.toLong() + 1),
265265
) {
266266
CrudEntry.fromRow(
267267
CrudRow(
@@ -278,7 +278,7 @@ internal class PowerSyncDatabaseImpl(
278278

279279
val hasMore = entries.size > limit
280280
if (hasMore) {
281-
entries.dropLast(entries.size - limit)
281+
entries = entries.dropLast(1)
282282
}
283283

284284
return CrudBatch(entries, hasMore, complete = { writeCheckpoint ->
@@ -351,11 +351,12 @@ internal class PowerSyncDatabaseImpl(
351351
override fun onChange(
352352
tables: Set<String>,
353353
throttleMs: Long,
354+
triggerImmediately: Boolean,
354355
): Flow<Set<String>> =
355356
flow {
356357
waitReady()
357358
emitAll(
358-
internalDb.onChange(tables, throttleMs),
359+
internalDb.onChange(tables, throttleMs, triggerImmediately),
359360
)
360361
}
361362

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public interface Queries {
9797
*
9898
* @param tables The set of tables to monitor for changes.
9999
* @param throttleMs The minimum interval, in milliseconds, between emissions. Defaults to [DEFAULT_THROTTLE]. Table changes are accumulated while throttling is active. The accumulated set of tables will be emitted on the trailing edge of the throttle.
100+
* @param triggerImmediately If true (default), the flow will immediately emit an empty set of tables when the flow is first collected. This can be useful for ensuring that the flow emits at least once, even if no changes occur to the monitored tables.
100101
* @return A [Flow] emitting the set of modified tables.
101102
* @throws PowerSyncException If a database error occurs.
102103
* @throws CancellationException If the operation is cancelled.
@@ -105,6 +106,7 @@ public interface Queries {
105106
public fun onChange(
106107
tables: Set<String>,
107108
throttleMs: Long = DEFAULT_THROTTLE.inWholeMilliseconds,
109+
triggerImmediately: Boolean = true,
108110
): Flow<Set<String>>
109111

110112
/**

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ internal class InternalDatabaseImpl(
106106
override fun onChange(
107107
tables: Set<String>,
108108
throttleMs: Long,
109+
triggerImmediately: Boolean,
109110
): Flow<Set<String>> =
110111
channelFlow {
111112
// Match all possible internal table combinations
@@ -116,7 +117,12 @@ internal class InternalDatabaseImpl(
116117
val batchedUpdates = AtomicMutableSet<String>()
117118

118119
updatesOnTables()
119-
.transform { updates ->
120+
.onSubscription {
121+
if (triggerImmediately) {
122+
// Emit an initial event (if requested). No changes would be detected at this point
123+
send(setOf())
124+
}
125+
}.transform { updates ->
120126
val intersection = updates.intersect(watchedTables)
121127
if (intersection.isNotEmpty()) {
122128
// Transform table names using friendlyTableName

0 commit comments

Comments
 (0)