diff --git a/CHANGELOG.md b/CHANGELOG.md index de37ceba..3e36294e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.2.2 + +* Supabase: Avoid creating `Json` serializers multiple times. +* Fix local writes not being uploaded correctly when using WebSockets as a transport protocol. + ## 1.2.1 * [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index f780a0d7..4ef7d80a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -467,6 +467,7 @@ abstract class BaseSyncIntegrationTest( val uploadStarted = CompletableDeferred() testConnector.uploadDataCallback = { db -> db.getCrudBatch()?.let { batch -> + logger.v { "connector: uploading crud batch" } uploadStarted.complete(Unit) completeUpload.await() batch.complete.invoke(null) @@ -478,7 +479,7 @@ abstract class BaseSyncIntegrationTest( turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) syncLines.send(SyncLine.KeepAlive(1234)) - turbine.waitFor { it.connected } + turbine.waitFor { it.connected && !it.uploading } turbine.cancelAndIgnoreRemainingEvents() } @@ -675,7 +676,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { verifyNoMoreCalls(connector) syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10)) - prefetchCalled.complete(Unit) + prefetchCalled.await() // Should still be connected before prefetch completes database.currentStatus.connected shouldBe true diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 6741777c..c67056bf 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -6,6 +6,8 @@ import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject internal interface BucketStorage { fun getMaxOpId(): String @@ -50,13 +52,31 @@ internal interface BucketStorage { partialPriority: BucketPriority? = null, ): SyncLocalDatabaseResult - suspend fun control( - op: String, - payload: String?, - ): List + suspend fun control(args: PowerSyncControlArguments): List +} + +internal sealed interface PowerSyncControlArguments { + @Serializable + class Start( + val parameters: JsonObject, + ) : PowerSyncControlArguments + + data object Stop : PowerSyncControlArguments - suspend fun control( - op: String, - payload: ByteArray, - ): List + data class TextLine( + val line: String, + ) : PowerSyncControlArguments + + class BinaryLine( + val line: ByteArray, + ) : PowerSyncControlArguments { + override fun toString(): String = "BinaryLine" + } + + data object CompletedUpload : PowerSyncControlArguments } + +@Serializable +internal class StartSyncIteration( + val parameters: JsonObject, +) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 1bbc3a9e..9be5c84c 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -365,22 +365,21 @@ internal class BucketStorageImpl( return JsonUtil.json.decodeFromString>(result) } - override suspend fun control( - op: String, - payload: String?, - ): List = + override suspend fun control(args: PowerSyncControlArguments): List = db.writeTransaction { tx -> - logger.v { "powersync_control($op, $payload)" } + logger.v { "powersync_control: $args" } - tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) - } + val (op: String, data: Any?) = + when (args) { + is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args) + PowerSyncControlArguments.Stop -> "stop" to null - override suspend fun control( - op: String, - payload: ByteArray, - ): List = - db.writeTransaction { tx -> - logger.v { "powersync_control($op, binary payload)" } - tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult) + PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null + + is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line + is PowerSyncControlArguments.TextLine -> "line_text" to args.line + } + + tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult) } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 9559f6fb..7ba8967d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -8,6 +8,7 @@ import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint +import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry @@ -47,7 +48,6 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock -import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement @@ -297,43 +297,41 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, - var hadSyncLine: Boolean = false, var fetchLinesJob: Job? = null, var credentialsInvalidation: Job? = null, ) { - suspend fun start() { - @Serializable - class StartParameters( - val parameters: JsonObject, - ) - - control("start", JsonUtil.json.encodeToString(StartParameters(params))) - fetchLinesJob?.join() - } - - suspend fun stop() { - control("stop") - fetchLinesJob?.join() - } + // Using a channel for control invocations so that they're handled by a single coroutine, + // avoiding races between concurrent jobs like fetching credentials. + private val controlInvocations = Channel() - private suspend fun control( - op: String, - payload: String? = null, - ) { - val instructions = bucketStorage.control(op, payload) - handleInstructions(instructions) + private suspend fun invokeControl(args: PowerSyncControlArguments) { + val instructions = bucketStorage.control(args) + instructions.forEach { handleInstruction(it) } } - private suspend fun control( - op: String, - payload: ByteArray, - ) { - val instructions = bucketStorage.control(op, payload) - handleInstructions(instructions) + suspend fun start() { + invokeControl(PowerSyncControlArguments.Start(params)) + + var hadSyncLine = false + for (line in controlInvocations) { + val instructions = bucketStorage.control(line) + instructions.forEach { handleInstruction(it) } + + if (!hadSyncLine && (line is PowerSyncControlArguments.TextLine || line is PowerSyncControlArguments.BinaryLine)) { + // Trigger a crud upload when receiving the first sync line: We could have + // pending local writes made while disconnected, so in addition to listening on + // updates to `ps_crud`, we also need to trigger a CRUD upload in some other + // cases. We do this on the first sync line because the client is likely to be + // online in that case. + hadSyncLine = true + triggerCrudUploadAsync() + } + } } - private suspend fun handleInstructions(instructions: List) { - instructions.forEach { handleInstruction(it) } + suspend fun stop() { + invokeControl(PowerSyncControlArguments.Stop) + fetchLinesJob?.join() } private suspend fun handleInstruction(instruction: Instruction) { @@ -341,23 +339,29 @@ internal class SyncStream( is Instruction.EstablishSyncStream -> { fetchLinesJob?.cancelAndJoin() fetchLinesJob = - scope.launch { - launch { - logger.v { "listening for completed uploads" } - - for (completion in completedCrudUploads) { - control("completed_upload") + scope + .launch { + launch { + logger.v { "listening for completed uploads" } + for (completion in completedCrudUploads) { + controlInvocations.send(PowerSyncControlArguments.CompletedUpload) + } } - } - launch { - connect(instruction) + launch { + connect(instruction) + } + }.also { + it.invokeOnCompletion { + controlInvocations.close() + } } - } } Instruction.CloseSyncStream -> { + logger.v { "Closing sync stream connection" } fetchLinesJob!!.cancelAndJoin() fetchLinesJob = null + logger.v { "Sync stream connection shut down" } } Instruction.FlushSileSystem -> { // We have durable file systems, so flushing is not necessary @@ -389,9 +393,10 @@ internal class SyncStream( val job = scope.launch { connector.prefetchCredentials().join() + logger.v { "Stopping because new credentials are available" } // Token has been refreshed, start another iteration - stop() + controlInvocations.send(PowerSyncControlArguments.Stop) } job.invokeOnCompletion { credentialsInvalidation = null @@ -409,36 +414,16 @@ internal class SyncStream( } } - /** - * Triggers a crud upload when called for the first time. - * - * We could have pending local writes made while disconnected, so in addition to listening - * on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We - * do this on the first sync line because the client is likely to be online in that case. - */ - private fun triggerCrudUploadIfFirstLine() { - if (!hadSyncLine) { - triggerCrudUploadAsync() - hadSyncLine = true - } - } - - private suspend fun line(text: String) { - triggerCrudUploadIfFirstLine() - control("line_text", text) - } - - private suspend fun line(blob: ByteArray) { - triggerCrudUploadIfFirstLine() - control("line_binary", blob) - } - private suspend fun connect(start: Instruction.EstablishSyncStream) { when (val method = options.method) { ConnectionMethod.Http -> - connectViaHttp(start.request).collect(this::line) + connectViaHttp(start.request).collect { + controlInvocations.send(PowerSyncControlArguments.TextLine(it)) + } is ConnectionMethod.WebSocket -> - connectViaWebSocket(start.request, method).collect(this::line) + connectViaWebSocket(start.request, method).collect { + controlInvocations.send(PowerSyncControlArguments.BinaryLine(it)) + } } } } diff --git a/gradle.properties b/gradle.properties index db7ae621..90d8dc53 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.2.1 +LIBRARY_VERSION=1.2.2 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/