@@ -8,6 +8,7 @@ import com.powersync.bucket.BucketChecksum
88import com.powersync.bucket.BucketRequest
99import com.powersync.bucket.BucketStorage
1010import com.powersync.bucket.Checkpoint
11+ import com.powersync.bucket.PowerSyncControlArguments
1112import com.powersync.bucket.WriteCheckpointResponse
1213import com.powersync.connectors.PowerSyncBackendConnector
1314import com.powersync.db.crud.CrudEntry
@@ -47,7 +48,6 @@ import kotlinx.coroutines.flow.flow
4748import kotlinx.coroutines.launch
4849import kotlinx.coroutines.withContext
4950import kotlinx.datetime.Clock
50- import kotlinx.serialization.Serializable
5151import kotlinx.serialization.json.JsonElement
5252import kotlinx.serialization.json.JsonObject
5353import kotlinx.serialization.json.encodeToJsonElement
@@ -297,67 +297,71 @@ internal class SyncStream(
297297 */
298298 private inner class ActiveIteration (
299299 val scope : CoroutineScope ,
300- var hadSyncLine : Boolean = false ,
301300 var fetchLinesJob : Job ? = null ,
302301 var credentialsInvalidation : Job ? = null ,
303302 ) {
304- suspend fun start () {
305- @Serializable
306- class StartParameters (
307- val parameters : JsonObject ,
308- )
309-
310- control(" start" , JsonUtil .json.encodeToString(StartParameters (params)))
311- fetchLinesJob?.join()
312- }
313-
314- suspend fun stop () {
315- control(" stop" )
316- fetchLinesJob?.join()
317- }
303+ // Using a channel for control invocations so that they're handled by a single coroutine,
304+ // avoiding races between concurrent jobs like fetching credentials.
305+ private val controlInvocations = Channel <PowerSyncControlArguments >()
318306
319- private suspend fun control (
320- op : String ,
321- payload : String? = null,
322- ) {
323- val instructions = bucketStorage.control(op, payload)
324- handleInstructions(instructions)
307+ private suspend fun invokeControl (args : PowerSyncControlArguments ) {
308+ val instructions = bucketStorage.control(args)
309+ instructions.forEach { handleInstruction(it) }
325310 }
326311
327- private suspend fun control (
328- op : String ,
329- payload : ByteArray ,
330- ) {
331- val instructions = bucketStorage.control(op, payload)
332- handleInstructions(instructions)
312+ suspend fun start () {
313+ invokeControl(PowerSyncControlArguments .Start (params))
314+
315+ var hadSyncLine = false
316+ for (line in controlInvocations) {
317+ val instructions = bucketStorage.control(line)
318+ instructions.forEach { handleInstruction(it) }
319+
320+ if (! hadSyncLine && (line is PowerSyncControlArguments .TextLine || line is PowerSyncControlArguments .BinaryLine )) {
321+ // Trigger a crud upload when receiving the first sync line: We could have
322+ // pending local writes made while disconnected, so in addition to listening on
323+ // updates to `ps_crud`, we also need to trigger a CRUD upload in some other
324+ // cases. We do this on the first sync line because the client is likely to be
325+ // online in that case.
326+ hadSyncLine = true
327+ triggerCrudUploadAsync()
328+ }
329+ }
333330 }
334331
335- private suspend fun handleInstructions (instructions : List <Instruction >) {
336- instructions.forEach { handleInstruction(it) }
332+ suspend fun stop () {
333+ invokeControl(PowerSyncControlArguments .Stop )
334+ fetchLinesJob?.join()
337335 }
338336
339337 private suspend fun handleInstruction (instruction : Instruction ) {
340338 when (instruction) {
341339 is Instruction .EstablishSyncStream -> {
342340 fetchLinesJob?.cancelAndJoin()
343341 fetchLinesJob =
344- scope.launch {
345- launch {
346- logger.v { " listening for completed uploads" }
347-
348- for (completion in completedCrudUploads) {
349- control(" completed_upload" )
342+ scope
343+ .launch {
344+ launch {
345+ logger.v { " listening for completed uploads" }
346+ for (completion in completedCrudUploads) {
347+ controlInvocations.send(PowerSyncControlArguments .CompletedUpload )
348+ }
350349 }
351- }
352350
353- launch {
354- connect(instruction)
351+ launch {
352+ connect(instruction)
353+ }
354+ }.also {
355+ it.invokeOnCompletion {
356+ controlInvocations.close()
357+ }
355358 }
356- }
357359 }
358360 Instruction .CloseSyncStream -> {
361+ logger.v { " Closing sync stream connection" }
359362 fetchLinesJob!! .cancelAndJoin()
360363 fetchLinesJob = null
364+ logger.v { " Sync stream connection shut down" }
361365 }
362366 Instruction .FlushSileSystem -> {
363367 // We have durable file systems, so flushing is not necessary
@@ -389,9 +393,10 @@ internal class SyncStream(
389393 val job =
390394 scope.launch {
391395 connector.prefetchCredentials().join()
396+ logger.v { " Stopping because new credentials are available" }
392397
393398 // Token has been refreshed, start another iteration
394- stop( )
399+ controlInvocations.send( PowerSyncControlArguments . Stop )
395400 }
396401 job.invokeOnCompletion {
397402 credentialsInvalidation = null
@@ -409,36 +414,16 @@ internal class SyncStream(
409414 }
410415 }
411416
412- /* *
413- * Triggers a crud upload when called for the first time.
414- *
415- * We could have pending local writes made while disconnected, so in addition to listening
416- * on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We
417- * do this on the first sync line because the client is likely to be online in that case.
418- */
419- private fun triggerCrudUploadIfFirstLine () {
420- if (! hadSyncLine) {
421- triggerCrudUploadAsync()
422- hadSyncLine = true
423- }
424- }
425-
426- private suspend fun line (text : String ) {
427- triggerCrudUploadIfFirstLine()
428- control(" line_text" , text)
429- }
430-
431- private suspend fun line (blob : ByteArray ) {
432- triggerCrudUploadIfFirstLine()
433- control(" line_binary" , blob)
434- }
435-
436417 private suspend fun connect (start : Instruction .EstablishSyncStream ) {
437418 when (val method = options.method) {
438419 ConnectionMethod .Http ->
439- connectViaHttp(start.request).collect(this ::line)
420+ connectViaHttp(start.request).collect {
421+ controlInvocations.send(PowerSyncControlArguments .TextLine (it))
422+ }
440423 is ConnectionMethod .WebSocket ->
441- connectViaWebSocket(start.request, method).collect(this ::line)
424+ connectViaWebSocket(start.request, method).collect {
425+ controlInvocations.send(PowerSyncControlArguments .BinaryLine (it))
426+ }
442427 }
443428 }
444429 }
0 commit comments