Skip to content

Commit 38710a6

Browse files
committed
Fix races in new sync client
1 parent 56457b9 commit 38710a6

File tree

4 files changed

+82
-80
lines changed

4 files changed

+82
-80
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
675675
verifyNoMoreCalls(connector)
676676

677677
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
678-
prefetchCalled.complete(Unit)
678+
prefetchCalled.await()
679679
// Should still be connected before prefetch completes
680680
database.currentStatus.connected shouldBe true
681681

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import com.powersync.sync.Instruction
66
import com.powersync.sync.LegacySyncImplementation
77
import com.powersync.sync.SyncDataBatch
88
import com.powersync.sync.SyncLocalDatabaseResult
9+
import kotlinx.serialization.Serializable
10+
import kotlinx.serialization.json.Json
11+
import kotlinx.serialization.json.JsonObject
912

1013
internal interface BucketStorage {
1114
fun getMaxOpId(): String
@@ -50,13 +53,27 @@ internal interface BucketStorage {
5053
partialPriority: BucketPriority? = null,
5154
): SyncLocalDatabaseResult
5255

53-
suspend fun control(
54-
op: String,
55-
payload: String?,
56-
): List<Instruction>
56+
suspend fun control(args: PowerSyncControlArguments): List<Instruction>
57+
}
5758

58-
suspend fun control(
59-
op: String,
60-
payload: ByteArray,
61-
): List<Instruction>
59+
internal sealed interface PowerSyncControlArguments {
60+
@Serializable
61+
class Start(
62+
val parameters: JsonObject
63+
): PowerSyncControlArguments
64+
data object Stop: PowerSyncControlArguments
65+
66+
data class TextLine(val line: String): PowerSyncControlArguments
67+
class BinaryLine(val line: ByteArray): PowerSyncControlArguments {
68+
override fun toString(): String {
69+
return "BinaryLine"
70+
}
71+
}
72+
73+
data object CompletedUpload: PowerSyncControlArguments
6274
}
75+
76+
@Serializable
77+
internal class StartSyncIteration(
78+
val parameters: JsonObject,
79+
)

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -365,22 +365,20 @@ internal class BucketStorageImpl(
365365
return JsonUtil.json.decodeFromString<List<Instruction>>(result)
366366
}
367367

368-
override suspend fun control(
369-
op: String,
370-
payload: String?,
371-
): List<Instruction> =
368+
override suspend fun control(args: PowerSyncControlArguments): List<Instruction> =
372369
db.writeTransaction { tx ->
373-
logger.v { "powersync_control($op, $payload)" }
370+
logger.v { "powersync_control: $args" }
374371

375-
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
376-
}
372+
val (op: String, data: Any?) = when (args) {
373+
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
374+
PowerSyncControlArguments.Stop -> "stop" to null
377375

378-
override suspend fun control(
379-
op: String,
380-
payload: ByteArray,
381-
): List<Instruction> =
382-
db.writeTransaction { tx ->
383-
logger.v { "powersync_control($op, binary payload)" }
384-
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
376+
PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null
377+
378+
is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
379+
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
380+
}
381+
382+
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
385383
}
386384
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 44 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.powersync.bucket.BucketChecksum
88
import com.powersync.bucket.BucketRequest
99
import com.powersync.bucket.BucketStorage
1010
import com.powersync.bucket.Checkpoint
11+
import com.powersync.bucket.PowerSyncControlArguments
1112
import com.powersync.bucket.WriteCheckpointResponse
1213
import com.powersync.connectors.PowerSyncBackendConnector
1314
import com.powersync.db.crud.CrudEntry
@@ -39,9 +40,11 @@ import kotlinx.coroutines.NonCancellable
3940
import kotlinx.coroutines.cancelAndJoin
4041
import kotlinx.coroutines.channels.BufferOverflow
4142
import kotlinx.coroutines.channels.Channel
43+
import kotlinx.coroutines.channels.consume
4244
import kotlinx.coroutines.coroutineScope
4345
import kotlinx.coroutines.delay
4446
import kotlinx.coroutines.flow.Flow
47+
import kotlinx.coroutines.flow.collect
4548
import kotlinx.coroutines.flow.emitAll
4649
import kotlinx.coroutines.flow.flow
4750
import kotlinx.coroutines.launch
@@ -297,43 +300,41 @@ internal class SyncStream(
297300
*/
298301
private inner class ActiveIteration(
299302
val scope: CoroutineScope,
300-
var hadSyncLine: Boolean = false,
301303
var fetchLinesJob: Job? = null,
302304
var credentialsInvalidation: Job? = null,
303305
) {
304-
suspend fun start() {
305-
@Serializable
306-
class StartParameters(
307-
val parameters: JsonObject,
308-
)
309-
310-
control("start", JsonUtil.json.encodeToString(StartParameters(params)))
311-
fetchLinesJob?.join()
312-
}
306+
// Using a channel for control invocations so that they're handled by a single coroutine,
307+
// avoiding races between concurrent jobs like fetching credentials.
308+
private val controlInvocations = Channel<PowerSyncControlArguments>()
313309

314-
suspend fun stop() {
315-
control("stop")
316-
fetchLinesJob?.join()
317-
}
318-
319-
private suspend fun control(
320-
op: String,
321-
payload: String? = null,
322-
) {
323-
val instructions = bucketStorage.control(op, payload)
324-
handleInstructions(instructions)
310+
private suspend fun invokeControl(args: PowerSyncControlArguments) {
311+
val instructions = bucketStorage.control(args)
312+
instructions.forEach { handleInstruction(it) }
325313
}
326314

327-
private suspend fun control(
328-
op: String,
329-
payload: ByteArray,
330-
) {
331-
val instructions = bucketStorage.control(op, payload)
332-
handleInstructions(instructions)
315+
suspend fun start() {
316+
invokeControl(PowerSyncControlArguments.Start(params))
317+
318+
var hadSyncLine = false
319+
for (line in controlInvocations) {
320+
val instructions = bucketStorage.control(line)
321+
instructions.forEach { handleInstruction(it) }
322+
323+
if (!hadSyncLine && (line is PowerSyncControlArguments.TextLine || line is PowerSyncControlArguments.BinaryLine)) {
324+
// Trigger a crud upload when receiving the first sync line: We could have
325+
// pending local writes made while disconnected, so in addition to listening on
326+
// updates to `ps_crud`, we also need to trigger a CRUD upload in some other
327+
// cases. We do this on the first sync line because the client is likely to be
328+
// online in that case.
329+
hadSyncLine = true
330+
triggerCrudUploadAsync()
331+
}
332+
}
333333
}
334334

335-
private suspend fun handleInstructions(instructions: List<Instruction>) {
336-
instructions.forEach { handleInstruction(it) }
335+
suspend fun stop() {
336+
invokeControl(PowerSyncControlArguments.Stop)
337+
fetchLinesJob?.join()
337338
}
338339

339340
private suspend fun handleInstruction(instruction: Instruction) {
@@ -344,20 +345,25 @@ internal class SyncStream(
344345
scope.launch {
345346
launch {
346347
logger.v { "listening for completed uploads" }
347-
348348
for (completion in completedCrudUploads) {
349-
control("completed_upload")
349+
controlInvocations.send(PowerSyncControlArguments.CompletedUpload)
350350
}
351351
}
352352

353353
launch {
354354
connect(instruction)
355355
}
356+
}.also {
357+
it.invokeOnCompletion {
358+
controlInvocations.close()
359+
}
356360
}
357361
}
358362
Instruction.CloseSyncStream -> {
363+
logger.v { "Closing sync stream connection" }
359364
fetchLinesJob!!.cancelAndJoin()
360365
fetchLinesJob = null
366+
logger.v { "Sync stream connection shut down" }
361367
}
362368
Instruction.FlushSileSystem -> {
363369
// We have durable file systems, so flushing is not necessary
@@ -389,6 +395,7 @@ internal class SyncStream(
389395
val job =
390396
scope.launch {
391397
connector.prefetchCredentials().join()
398+
logger.v { "Stopping because new credentials are available" }
392399

393400
// Token has been refreshed, start another iteration
394401
stop()
@@ -409,36 +416,16 @@ internal class SyncStream(
409416
}
410417
}
411418

412-
/**
413-
* Triggers a crud upload when called for the first time.
414-
*
415-
* We could have pending local writes made while disconnected, so in addition to listening
416-
* on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We
417-
* do this on the first sync line because the client is likely to be online in that case.
418-
*/
419-
private fun triggerCrudUploadIfFirstLine() {
420-
if (!hadSyncLine) {
421-
triggerCrudUploadAsync()
422-
hadSyncLine = true
423-
}
424-
}
425-
426-
private suspend fun line(text: String) {
427-
control("line_text", text)
428-
triggerCrudUploadIfFirstLine()
429-
}
430-
431-
private suspend fun line(blob: ByteArray) {
432-
control("line_binary", blob)
433-
triggerCrudUploadIfFirstLine()
434-
}
435-
436419
private suspend fun connect(start: Instruction.EstablishSyncStream) {
437420
when (val method = options.method) {
438421
ConnectionMethod.Http ->
439-
connectViaHttp(start.request).collect(this::line)
422+
connectViaHttp(start.request).collect {
423+
controlInvocations.send(PowerSyncControlArguments.TextLine(it))
424+
}
440425
is ConnectionMethod.WebSocket ->
441-
connectViaWebSocket(start.request, method).collect(this::line)
426+
connectViaWebSocket(start.request, method).collect {
427+
controlInvocations.send(PowerSyncControlArguments.BinaryLine(it))
428+
}
442429
}
443430
}
444431
}

0 commit comments

Comments
 (0)