Skip to content

Fix crud uploads for websockets #212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.2.2

* Supabase: Avoid creating `Json` serializers multiple times.
* Fix local writes not being uploaded correctly when using WebSockets as a transport protocol.

## 1.2.1

* [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ abstract class BaseSyncIntegrationTest(
val uploadStarted = CompletableDeferred<Unit>()
testConnector.uploadDataCallback = { db ->
db.getCrudBatch()?.let { batch ->
logger.v { "connector: uploading crud batch" }
uploadStarted.complete(Unit)
completeUpload.await()
batch.complete.invoke(null)
Expand All @@ -478,7 +479,7 @@ abstract class BaseSyncIntegrationTest(
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
syncLines.send(SyncLine.KeepAlive(1234))
turbine.waitFor { it.connected }
turbine.waitFor { it.connected && !it.uploading }
turbine.cancelAndIgnoreRemainingEvents()
}

Expand Down Expand Up @@ -675,7 +676,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
verifyNoMoreCalls(connector)

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
prefetchCalled.complete(Unit)
prefetchCalled.await()
// Should still be connected before prefetch completes
database.currentStatus.connected shouldBe true

Expand Down
36 changes: 28 additions & 8 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncDataBatch
import com.powersync.sync.SyncLocalDatabaseResult
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject

internal interface BucketStorage {
fun getMaxOpId(): String
Expand Down Expand Up @@ -50,13 +52,31 @@ internal interface BucketStorage {
partialPriority: BucketPriority? = null,
): SyncLocalDatabaseResult

suspend fun control(
op: String,
payload: String?,
): List<Instruction>
suspend fun control(args: PowerSyncControlArguments): List<Instruction>
}

internal sealed interface PowerSyncControlArguments {
@Serializable
class Start(
val parameters: JsonObject,
) : PowerSyncControlArguments

data object Stop : PowerSyncControlArguments

suspend fun control(
op: String,
payload: ByteArray,
): List<Instruction>
data class TextLine(
val line: String,
) : PowerSyncControlArguments

class BinaryLine(
val line: ByteArray,
) : PowerSyncControlArguments {
override fun toString(): String = "BinaryLine"
}

data object CompletedUpload : PowerSyncControlArguments
}

@Serializable
internal class StartSyncIteration(
val parameters: JsonObject,
)
Original file line number Diff line number Diff line change
Expand Up @@ -365,22 +365,21 @@ internal class BucketStorageImpl(
return JsonUtil.json.decodeFromString<List<Instruction>>(result)
}

override suspend fun control(
op: String,
payload: String?,
): List<Instruction> =
override suspend fun control(args: PowerSyncControlArguments): List<Instruction> =
db.writeTransaction { tx ->
logger.v { "powersync_control($op, $payload)" }
logger.v { "powersync_control: $args" }

tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
}
val (op: String, data: Any?) =
when (args) {
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
PowerSyncControlArguments.Stop -> "stop" to null

override suspend fun control(
op: String,
payload: ByteArray,
): List<Instruction> =
db.writeTransaction { tx ->
logger.v { "powersync_control($op, binary payload)" }
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, payload), ::handleControlResult)
PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null

is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
}

tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
}
}
119 changes: 52 additions & 67 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketRequest
import com.powersync.bucket.BucketStorage
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.PowerSyncControlArguments
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.db.crud.CrudEntry
Expand Down Expand Up @@ -47,7 +48,6 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.encodeToJsonElement
Expand Down Expand Up @@ -297,67 +297,71 @@ internal class SyncStream(
*/
private inner class ActiveIteration(
val scope: CoroutineScope,
var hadSyncLine: Boolean = false,
var fetchLinesJob: Job? = null,
var credentialsInvalidation: Job? = null,
) {
suspend fun start() {
@Serializable
class StartParameters(
val parameters: JsonObject,
)

control("start", JsonUtil.json.encodeToString(StartParameters(params)))
fetchLinesJob?.join()
}

suspend fun stop() {
control("stop")
fetchLinesJob?.join()
}
// Using a channel for control invocations so that they're handled by a single coroutine,
// avoiding races between concurrent jobs like fetching credentials.
private val controlInvocations = Channel<PowerSyncControlArguments>()

private suspend fun control(
op: String,
payload: String? = null,
) {
val instructions = bucketStorage.control(op, payload)
handleInstructions(instructions)
private suspend fun invokeControl(args: PowerSyncControlArguments) {
val instructions = bucketStorage.control(args)
instructions.forEach { handleInstruction(it) }
}

private suspend fun control(
op: String,
payload: ByteArray,
) {
val instructions = bucketStorage.control(op, payload)
handleInstructions(instructions)
suspend fun start() {
invokeControl(PowerSyncControlArguments.Start(params))

var hadSyncLine = false
for (line in controlInvocations) {
val instructions = bucketStorage.control(line)
instructions.forEach { handleInstruction(it) }

if (!hadSyncLine && (line is PowerSyncControlArguments.TextLine || line is PowerSyncControlArguments.BinaryLine)) {
// Trigger a crud upload when receiving the first sync line: We could have
// pending local writes made while disconnected, so in addition to listening on
// updates to `ps_crud`, we also need to trigger a CRUD upload in some other
// cases. We do this on the first sync line because the client is likely to be
// online in that case.
hadSyncLine = true
triggerCrudUploadAsync()
}
}
}

private suspend fun handleInstructions(instructions: List<Instruction>) {
instructions.forEach { handleInstruction(it) }
suspend fun stop() {
invokeControl(PowerSyncControlArguments.Stop)
fetchLinesJob?.join()
}

private suspend fun handleInstruction(instruction: Instruction) {
when (instruction) {
is Instruction.EstablishSyncStream -> {
fetchLinesJob?.cancelAndJoin()
fetchLinesJob =
scope.launch {
launch {
logger.v { "listening for completed uploads" }

for (completion in completedCrudUploads) {
control("completed_upload")
scope
.launch {
launch {
logger.v { "listening for completed uploads" }
for (completion in completedCrudUploads) {
controlInvocations.send(PowerSyncControlArguments.CompletedUpload)
}
}
}

launch {
connect(instruction)
launch {
connect(instruction)
}
}.also {
it.invokeOnCompletion {
controlInvocations.close()
}
}
}
}
Instruction.CloseSyncStream -> {
logger.v { "Closing sync stream connection" }
fetchLinesJob!!.cancelAndJoin()
fetchLinesJob = null
logger.v { "Sync stream connection shut down" }
}
Instruction.FlushSileSystem -> {
// We have durable file systems, so flushing is not necessary
Expand Down Expand Up @@ -389,9 +393,10 @@ internal class SyncStream(
val job =
scope.launch {
connector.prefetchCredentials().join()
logger.v { "Stopping because new credentials are available" }

// Token has been refreshed, start another iteration
stop()
controlInvocations.send(PowerSyncControlArguments.Stop)
}
job.invokeOnCompletion {
credentialsInvalidation = null
Expand All @@ -409,36 +414,16 @@ internal class SyncStream(
}
}

/**
* Triggers a crud upload when called for the first time.
*
* We could have pending local writes made while disconnected, so in addition to listening
* on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We
* do this on the first sync line because the client is likely to be online in that case.
*/
private fun triggerCrudUploadIfFirstLine() {
if (!hadSyncLine) {
triggerCrudUploadAsync()
hadSyncLine = true
}
}

private suspend fun line(text: String) {
triggerCrudUploadIfFirstLine()
control("line_text", text)
}

private suspend fun line(blob: ByteArray) {
triggerCrudUploadIfFirstLine()
control("line_binary", blob)
}

private suspend fun connect(start: Instruction.EstablishSyncStream) {
when (val method = options.method) {
ConnectionMethod.Http ->
connectViaHttp(start.request).collect(this::line)
connectViaHttp(start.request).collect {
controlInvocations.send(PowerSyncControlArguments.TextLine(it))
}
is ConnectionMethod.WebSocket ->
connectViaWebSocket(start.request, method).collect(this::line)
connectViaWebSocket(start.request, method).collect {
controlInvocations.send(PowerSyncControlArguments.BinaryLine(it))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ development=true
RELEASE_SIGNING_ENABLED=true
# Library config
GROUP=com.powersync
LIBRARY_VERSION=1.2.1
LIBRARY_VERSION=1.2.2
GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git
# POM
POM_URL=https://github.com/powersync-ja/powersync-kotlin/
Expand Down