From 56457b9c855949f96b45eec7b3b00a85719ecdce Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 1 Jul 2025 16:07:42 -0600 Subject: [PATCH 1/5] Fix crud uploads for websockets --- CHANGELOG.md | 5 +++++ core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de37ceba..3e36294e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.2.2 + +* Supabase: Avoid creating `Json` serializers multiple times. +* Fix local writes not being uploaded correctly when using WebSockets as a transport protocol. + ## 1.2.1 * [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`. diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 9559f6fb..745fe3aa 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -424,13 +424,13 @@ internal class SyncStream( } private suspend fun line(text: String) { - triggerCrudUploadIfFirstLine() control("line_text", text) + triggerCrudUploadIfFirstLine() } private suspend fun line(blob: ByteArray) { - triggerCrudUploadIfFirstLine() control("line_binary", blob) + triggerCrudUploadIfFirstLine() } private suspend fun connect(start: Instruction.EstablishSyncStream) { From 38710a6b6aab65e229203b54805f2be00894419f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 09:55:57 -0600 Subject: [PATCH 2/5] Fix races in new sync client --- .../com/powersync/sync/SyncIntegrationTest.kt | 2 +- .../com/powersync/bucket/BucketStorage.kt | 33 ++++-- .../com/powersync/bucket/BucketStorageImpl.kt | 26 +++-- .../kotlin/com/powersync/sync/SyncStream.kt | 101 ++++++++---------- 4 files changed, 82 insertions(+), 80 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index f780a0d7..9668170a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -675,7 +675,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { verifyNoMoreCalls(connector) syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10)) - prefetchCalled.complete(Unit) + prefetchCalled.await() // Should still be connected before prefetch completes database.currentStatus.connected shouldBe true diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 6741777c..82c2e054 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -6,6 +6,9 @@ import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject internal interface BucketStorage { fun getMaxOpId(): String @@ -50,13 +53,27 @@ internal interface BucketStorage { partialPriority: BucketPriority? = null, ): SyncLocalDatabaseResult - suspend fun control( - op: String, - payload: String?, - ): List + suspend fun control(args: PowerSyncControlArguments): List +} - suspend fun control( - op: String, - payload: ByteArray, - ): List +internal sealed interface PowerSyncControlArguments { + @Serializable + class Start( + val parameters: JsonObject + ): PowerSyncControlArguments + data object Stop: PowerSyncControlArguments + + data class TextLine(val line: String): PowerSyncControlArguments + class BinaryLine(val line: ByteArray): PowerSyncControlArguments { + override fun toString(): String { + return "BinaryLine" + } + } + + data object CompletedUpload: PowerSyncControlArguments } + +@Serializable +internal class StartSyncIteration( + val parameters: JsonObject, +) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 1bbc3a9e..22038aed 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -365,22 +365,20 @@ internal class BucketStorageImpl( return JsonUtil.json.decodeFromString>(result) } - override suspend fun control( - op: String, - payload: String?, - ): List = + override suspend fun control(args: PowerSyncControlArguments): List = db.writeTransaction { tx -> - logger.v { "powersync_control($op, $payload)" } + logger.v { "powersync_control: $args" } - tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) - } + val (op: String, data: Any?) = when (args) { + is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args) + PowerSyncControlArguments.Stop -> "stop" to null - override suspend fun control( - op: String, - payload: ByteArray, - ): List = - db.writeTransaction { tx -> - logger.v { "powersync_control($op, binary payload)" } - tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) + PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null + + is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line + is PowerSyncControlArguments.TextLine -> "line_text" to args.line + } + + tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult) } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 745fe3aa..99442988 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -8,6 +8,7 @@ import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint +import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry @@ -39,9 +40,11 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.consume import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch @@ -297,43 +300,41 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, - var hadSyncLine: Boolean = false, var fetchLinesJob: Job? = null, var credentialsInvalidation: Job? = null, ) { - suspend fun start() { - @Serializable - class StartParameters( - val parameters: JsonObject, - ) - - control("start", JsonUtil.json.encodeToString(StartParameters(params))) - fetchLinesJob?.join() - } + // Using a channel for control invocations so that they're handled by a single coroutine, + // avoiding races between concurrent jobs like fetching credentials. + private val controlInvocations = Channel() - suspend fun stop() { - control("stop") - fetchLinesJob?.join() - } - - private suspend fun control( - op: String, - payload: String? = null, - ) { - val instructions = bucketStorage.control(op, payload) - handleInstructions(instructions) + private suspend fun invokeControl(args: PowerSyncControlArguments) { + val instructions = bucketStorage.control(args) + instructions.forEach { handleInstruction(it) } } - private suspend fun control( - op: String, - payload: ByteArray, - ) { - val instructions = bucketStorage.control(op, payload) - handleInstructions(instructions) + suspend fun start() { + invokeControl(PowerSyncControlArguments.Start(params)) + + var hadSyncLine = false + for (line in controlInvocations) { + val instructions = bucketStorage.control(line) + instructions.forEach { handleInstruction(it) } + + if (!hadSyncLine && (line is PowerSyncControlArguments.TextLine || line is PowerSyncControlArguments.BinaryLine)) { + // Trigger a crud upload when receiving the first sync line: We could have + // pending local writes made while disconnected, so in addition to listening on + // updates to `ps_crud`, we also need to trigger a CRUD upload in some other + // cases. We do this on the first sync line because the client is likely to be + // online in that case. + hadSyncLine = true + triggerCrudUploadAsync() + } + } } - private suspend fun handleInstructions(instructions: List) { - instructions.forEach { handleInstruction(it) } + suspend fun stop() { + invokeControl(PowerSyncControlArguments.Stop) + fetchLinesJob?.join() } private suspend fun handleInstruction(instruction: Instruction) { @@ -344,20 +345,25 @@ internal class SyncStream( scope.launch { launch { logger.v { "listening for completed uploads" } - for (completion in completedCrudUploads) { - control("completed_upload") + controlInvocations.send(PowerSyncControlArguments.CompletedUpload) } } launch { connect(instruction) } + }.also { + it.invokeOnCompletion { + controlInvocations.close() + } } } Instruction.CloseSyncStream -> { + logger.v { "Closing sync stream connection" } fetchLinesJob!!.cancelAndJoin() fetchLinesJob = null + logger.v { "Sync stream connection shut down" } } Instruction.FlushSileSystem -> { // We have durable file systems, so flushing is not necessary @@ -389,6 +395,7 @@ internal class SyncStream( val job = scope.launch { connector.prefetchCredentials().join() + logger.v { "Stopping because new credentials are available" } // Token has been refreshed, start another iteration stop() @@ -409,36 +416,16 @@ internal class SyncStream( } } - /** - * Triggers a crud upload when called for the first time. - * - * We could have pending local writes made while disconnected, so in addition to listening - * on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We - * do this on the first sync line because the client is likely to be online in that case. - */ - private fun triggerCrudUploadIfFirstLine() { - if (!hadSyncLine) { - triggerCrudUploadAsync() - hadSyncLine = true - } - } - - private suspend fun line(text: String) { - control("line_text", text) - triggerCrudUploadIfFirstLine() - } - - private suspend fun line(blob: ByteArray) { - control("line_binary", blob) - triggerCrudUploadIfFirstLine() - } - private suspend fun connect(start: Instruction.EstablishSyncStream) { when (val method = options.method) { ConnectionMethod.Http -> - connectViaHttp(start.request).collect(this::line) + connectViaHttp(start.request).collect { + controlInvocations.send(PowerSyncControlArguments.TextLine(it)) + } is ConnectionMethod.WebSocket -> - connectViaWebSocket(start.request, method).collect(this::line) + connectViaWebSocket(start.request, method).collect { + controlInvocations.send(PowerSyncControlArguments.BinaryLine(it)) + } } } } From 5527331cc7d55061984267df83cd09d88c9de3d8 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 10:01:17 -0600 Subject: [PATCH 3/5] Reformat --- .../com/powersync/bucket/BucketStorage.kt | 25 +++++++++------- .../com/powersync/bucket/BucketStorageImpl.kt | 15 +++++----- .../kotlin/com/powersync/sync/SyncStream.kt | 30 +++++++++---------- 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 82c2e054..c67056bf 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -7,7 +7,6 @@ import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject internal interface BucketStorage { @@ -59,18 +58,22 @@ internal interface BucketStorage { internal sealed interface PowerSyncControlArguments { @Serializable class Start( - val parameters: JsonObject - ): PowerSyncControlArguments - data object Stop: PowerSyncControlArguments - - data class TextLine(val line: String): PowerSyncControlArguments - class BinaryLine(val line: ByteArray): PowerSyncControlArguments { - override fun toString(): String { - return "BinaryLine" - } + val parameters: JsonObject, + ) : PowerSyncControlArguments + + data object Stop : PowerSyncControlArguments + + data class TextLine( + val line: String, + ) : PowerSyncControlArguments + + class BinaryLine( + val line: ByteArray, + ) : PowerSyncControlArguments { + override fun toString(): String = "BinaryLine" } - data object CompletedUpload: PowerSyncControlArguments + data object CompletedUpload : PowerSyncControlArguments } @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 22038aed..9be5c84c 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -369,15 +369,16 @@ internal class BucketStorageImpl( db.writeTransaction { tx -> logger.v { "powersync_control: $args" } - val (op: String, data: Any?) = when (args) { - is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args) - PowerSyncControlArguments.Stop -> "stop" to null + val (op: String, data: Any?) = + when (args) { + is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args) + PowerSyncControlArguments.Stop -> "stop" to null - PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null + PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null - is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line - is PowerSyncControlArguments.TextLine -> "line_text" to args.line - } + is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line + is PowerSyncControlArguments.TextLine -> "line_text" to args.line + } tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult) } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 99442988..d509b0af 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -40,17 +40,14 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.consume import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock -import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement @@ -342,22 +339,23 @@ internal class SyncStream( is Instruction.EstablishSyncStream -> { fetchLinesJob?.cancelAndJoin() fetchLinesJob = - scope.launch { - launch { - logger.v { "listening for completed uploads" } - for (completion in completedCrudUploads) { - controlInvocations.send(PowerSyncControlArguments.CompletedUpload) + scope + .launch { + launch { + logger.v { "listening for completed uploads" } + for (completion in completedCrudUploads) { + controlInvocations.send(PowerSyncControlArguments.CompletedUpload) + } } - } - launch { - connect(instruction) - } - }.also { - it.invokeOnCompletion { - controlInvocations.close() + launch { + connect(instruction) + } + }.also { + it.invokeOnCompletion { + controlInvocations.close() + } } - } } Instruction.CloseSyncStream -> { logger.v { "Closing sync stream connection" } From f0e15d6c2a4ec1272f994a4c2f37766efc6d3395 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 10:10:21 -0600 Subject: [PATCH 4/5] And another one --- .../kotlin/com/powersync/sync/SyncIntegrationTest.kt | 3 ++- core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 9668170a..4ef7d80a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -467,6 +467,7 @@ abstract class BaseSyncIntegrationTest( val uploadStarted = CompletableDeferred() testConnector.uploadDataCallback = { db -> db.getCrudBatch()?.let { batch -> + logger.v { "connector: uploading crud batch" } uploadStarted.complete(Unit) completeUpload.await() batch.complete.invoke(null) @@ -478,7 +479,7 @@ abstract class BaseSyncIntegrationTest( turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) syncLines.send(SyncLine.KeepAlive(1234)) - turbine.waitFor { it.connected } + turbine.waitFor { it.connected && !it.uploading } turbine.cancelAndIgnoreRemainingEvents() } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index d509b0af..7ba8967d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -396,7 +396,7 @@ internal class SyncStream( logger.v { "Stopping because new credentials are available" } // Token has been refreshed, start another iteration - stop() + controlInvocations.send(PowerSyncControlArguments.Stop) } job.invokeOnCompletion { credentialsInvalidation = null From 5c6c6fef4b9b89d4a4ca6b7fc81a2a6c32fcdacb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Jul 2025 10:14:27 -0600 Subject: [PATCH 5/5] Prepare release while we're at it --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index db7ae621..90d8dc53 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.2.1 +LIBRARY_VERSION=1.2.2 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/