Skip to content

Commit dfe2620

Browse files
authored
Refactor run wrapped (#217)
1 parent a2437d7 commit dfe2620

File tree

9 files changed

+73
-49
lines changed

9 files changed

+73
-49
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.2.3 (unreleased)
4+
5+
* Fix `runWrapped` catching cancellation exceptions.
6+
37
## 1.2.2
48

59
* Supabase: Avoid creating `Json` serializers multiple times.

connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import com.powersync.connectors.PowerSyncBackendConnector
66
import com.powersync.connectors.PowerSyncCredentials
77
import com.powersync.db.crud.CrudEntry
88
import com.powersync.db.crud.UpdateType
9-
import com.powersync.db.runWrappedSuspending
9+
import com.powersync.db.runWrapped
1010
import io.github.jan.supabase.SupabaseClient
1111
import io.github.jan.supabase.annotations.SupabaseInternal
1212
import io.github.jan.supabase.auth.Auth
@@ -115,7 +115,7 @@ public class SupabaseConnector(
115115
email: String,
116116
password: String,
117117
) {
118-
runWrappedSuspending {
118+
runWrapped {
119119
supabaseClient.auth.signInWith(Email) {
120120
this.email = email
121121
this.password = password
@@ -127,7 +127,7 @@ public class SupabaseConnector(
127127
email: String,
128128
password: String,
129129
) {
130-
runWrappedSuspending {
130+
runWrapped {
131131
supabaseClient.auth.signUpWith(Email) {
132132
this.email = email
133133
this.password = password
@@ -136,7 +136,7 @@ public class SupabaseConnector(
136136
}
137137

138138
public suspend fun signOut() {
139-
runWrappedSuspending {
139+
runWrapped {
140140
supabaseClient.auth.signOut()
141141
}
142142
}
@@ -146,7 +146,7 @@ public class SupabaseConnector(
146146
public val sessionStatus: StateFlow<SessionStatus> = supabaseClient.auth.sessionStatus
147147

148148
public suspend fun loginAnonymously() {
149-
runWrappedSuspending {
149+
runWrapped {
150150
supabaseClient.auth.signInAnonymously()
151151
}
152152
}
@@ -155,7 +155,7 @@ public class SupabaseConnector(
155155
* Get credentials for PowerSync.
156156
*/
157157
override suspend fun fetchCredentials(): PowerSyncCredentials =
158-
runWrappedSuspending {
158+
runWrapped {
159159
check(supabaseClient.auth.sessionStatus.value is SessionStatus.Authenticated) { "Supabase client is not authenticated" }
160160

161161
// Use Supabase token for PowerSync
@@ -178,8 +178,8 @@ public class SupabaseConnector(
178178
* If this call throws an error, it is retried periodically.
179179
*/
180180
override suspend fun uploadData(database: PowerSyncDatabase) {
181-
return runWrappedSuspending {
182-
val transaction = database.getNextCrudTransaction() ?: return@runWrappedSuspending
181+
return runWrapped {
182+
val transaction = database.getNextCrudTransaction() ?: return@runWrapped
183183

184184
var lastEntry: CrudEntry? = null
185185
try {
@@ -227,7 +227,7 @@ public class SupabaseConnector(
227227
Logger.e("Data upload error: ${e.message}")
228228
Logger.e("Discarding entry: $lastEntry")
229229
transaction.complete(null)
230-
return@runWrappedSuspending
230+
return@runWrapped
231231
}
232232

233233
Logger.e("Data upload error - retrying last entry: $lastEntry, $e")

core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import com.powersync.attachments.storage.IOLocalStorageAdapter
88
import com.powersync.attachments.sync.SyncingService
99
import com.powersync.db.getString
1010
import com.powersync.db.internal.ConnectionContext
11-
import com.powersync.db.runWrappedSuspending
11+
import com.powersync.db.runWrapped
1212
import kotlinx.coroutines.CoroutineScope
1313
import kotlinx.coroutines.Dispatchers
1414
import kotlinx.coroutines.IO
@@ -207,7 +207,7 @@ public open class AttachmentQueue(
207207
*/
208208
@Throws(PowerSyncException::class, CancellationException::class)
209209
public suspend fun startSync(): Unit =
210-
runWrappedSuspending {
210+
runWrapped {
211211
mutex.withLock {
212212
if (closed) {
213213
throw Exception("Attachment queue has been closed")
@@ -261,9 +261,9 @@ public open class AttachmentQueue(
261261
}
262262

263263
private suspend fun stopSyncingInternal(): Unit =
264-
runWrappedSuspending {
264+
runWrapped {
265265
if (closed) {
266-
return@runWrappedSuspending
266+
return@runWrapped
267267
}
268268

269269
syncStatusJob?.cancelAndJoin()
@@ -278,10 +278,10 @@ public open class AttachmentQueue(
278278
*/
279279
@Throws(PowerSyncException::class, CancellationException::class)
280280
public suspend fun close(): Unit =
281-
runWrappedSuspending {
281+
runWrapped {
282282
mutex.withLock {
283283
if (closed) {
284-
return@runWrappedSuspending
284+
return@runWrapped
285285
}
286286

287287
syncStatusJob?.cancelAndJoin()
@@ -322,7 +322,7 @@ public open class AttachmentQueue(
322322
*/
323323
@Throws(PowerSyncException::class, CancellationException::class)
324324
public open suspend fun processWatchedAttachments(items: List<WatchedAttachmentItem>): Unit =
325-
runWrappedSuspending {
325+
runWrapped {
326326
/**
327327
* Use a lock here to prevent conflicting state updates.
328328
*/
@@ -436,7 +436,7 @@ public open class AttachmentQueue(
436436
metaData: String? = null,
437437
updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit,
438438
): Attachment =
439-
runWrappedSuspending {
439+
runWrapped {
440440
val id = db.get("SELECT uuid() as id") { it.getString("id") }
441441
val filename =
442442
resolveNewAttachmentFilename(attachmentId = id, fileExtension = fileExtension)
@@ -487,7 +487,7 @@ public open class AttachmentQueue(
487487
attachmentId: String,
488488
updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit,
489489
): Attachment =
490-
runWrappedSuspending {
490+
runWrapped {
491491
attachmentsService.withContext { attachmentContext ->
492492
val attachment =
493493
attachmentContext.getAttachment(attachmentId)

core/src/commonMain/kotlin/com/powersync/attachments/storage/IOLocalStorageAdapter.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.powersync.attachments.storage
22

33
import com.powersync.attachments.LocalStorage
4-
import com.powersync.db.runWrappedSuspending
4+
import com.powersync.db.runWrapped
55
import io.ktor.utils.io.core.remaining
66
import kotlinx.coroutines.Dispatchers
77
import kotlinx.coroutines.IO
@@ -26,7 +26,7 @@ public open class IOLocalStorageAdapter(
2626
filePath: String,
2727
data: Flow<ByteArray>,
2828
): Long =
29-
runWrappedSuspending {
29+
runWrapped {
3030
withContext(Dispatchers.IO) {
3131
var totalSize = 0L
3232
fileSystem.sink(Path(filePath)).use { sink ->
@@ -65,28 +65,28 @@ public open class IOLocalStorageAdapter(
6565
}.flowOn(Dispatchers.IO)
6666

6767
public override suspend fun deleteFile(filePath: String): Unit =
68-
runWrappedSuspending {
68+
runWrapped {
6969
withContext(Dispatchers.IO) {
7070
fileSystem.delete(Path(filePath))
7171
}
7272
}
7373

7474
public override suspend fun fileExists(filePath: String): Boolean =
75-
runWrappedSuspending {
75+
runWrapped {
7676
withContext(Dispatchers.IO) {
7777
fileSystem.exists(Path(filePath))
7878
}
7979
}
8080

8181
public override suspend fun makeDir(path: String): Unit =
82-
runWrappedSuspending {
82+
runWrapped {
8383
withContext(Dispatchers.IO) {
8484
fileSystem.createDirectories(Path(path))
8585
}
8686
}
8787

8888
public override suspend fun rmDir(path: String): Unit =
89-
runWrappedSuspending {
89+
runWrapped {
9090
withContext(Dispatchers.IO) {
9191
for (item in fileSystem.list(Path(path))) {
9292
// Can't delete directories with files in them. Need to go down the file tree
@@ -105,7 +105,7 @@ public open class IOLocalStorageAdapter(
105105
sourcePath: String,
106106
targetPath: String,
107107
): Unit =
108-
runWrappedSuspending {
108+
runWrapped {
109109
withContext(Dispatchers.IO) {
110110
fileSystem.source(Path(sourcePath)).use { source ->
111111
fileSystem.sink(Path(targetPath)).use { sink ->

core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.powersync.connectors
22

33
import com.powersync.PowerSyncDatabase
44
import com.powersync.PowerSyncException
5-
import com.powersync.db.runWrappedSuspending
5+
import com.powersync.db.runWrapped
66
import kotlinx.coroutines.CoroutineScope
77
import kotlinx.coroutines.Dispatchers
88
import kotlinx.coroutines.Job
@@ -31,8 +31,8 @@ public abstract class PowerSyncBackendConnector {
3131
*/
3232
@Throws(PowerSyncException::class, CancellationException::class)
3333
public open suspend fun getCredentialsCached(): PowerSyncCredentials? {
34-
return runWrappedSuspending {
35-
cachedCredentials?.let { return@runWrappedSuspending it }
34+
return runWrapped {
35+
cachedCredentials?.let { return@runWrapped it }
3636
prefetchCredentials().join()
3737
cachedCredentials
3838
}

core/src/commonMain/kotlin/com/powersync/db/Functions.kt

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,10 @@ import co.touchlab.kermit.Logger
44
import com.powersync.PowerSyncException
55
import kotlinx.coroutines.CancellationException
66

7-
public fun <R> runWrapped(block: () -> R): R =
8-
try {
9-
block()
10-
} catch (t: Throwable) {
11-
if (t is PowerSyncException) {
12-
Logger.e("PowerSyncException: ${t.message}")
13-
throw t
14-
} else {
15-
Logger.e("PowerSyncException: ${t.message}")
16-
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
17-
}
18-
}
19-
20-
public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
7+
/**
8+
* Runs the given [block], wrapping exceptions as [PowerSyncException]s.
9+
*/
10+
public inline fun <R> runWrapped(block: () -> R): R =
2111
try {
2212
block()
2313
} catch (t: Throwable) {
@@ -33,3 +23,9 @@ public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
3323
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
3424
}
3525
}
26+
27+
@Deprecated("Use runWrapped instead", replaceWith = ReplaceWith("runWrapped"))
28+
public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
29+
runWrapped {
30+
block()
31+
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ internal class PowerSyncDatabaseImpl(
128128
}
129129

130130
override suspend fun updateSchema(schema: Schema) =
131-
runWrappedSuspending {
131+
runWrapped {
132132
waitReady()
133133
updateSchemaInternal(schema)
134134
}
@@ -515,7 +515,7 @@ internal class PowerSyncDatabaseImpl(
515515
}
516516

517517
override suspend fun close() =
518-
runWrappedSuspending {
518+
runWrapped {
519519
mutex.withLock {
520520
if (closed) {
521521
return@withLock

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import com.powersync.db.SqlCursor
77
import com.powersync.db.ThrowableLockCallback
88
import com.powersync.db.ThrowableTransactionCallback
99
import com.powersync.db.runWrapped
10-
import com.powersync.db.runWrappedSuspending
1110
import com.powersync.utils.AtomicMutableSet
1211
import com.powersync.utils.JsonUtil
1312
import com.powersync.utils.throttle
@@ -24,7 +23,6 @@ import kotlinx.coroutines.flow.transform
2423
import kotlinx.coroutines.sync.Mutex
2524
import kotlinx.coroutines.sync.withLock
2625
import kotlinx.coroutines.withContext
27-
import kotlinx.serialization.encodeToString
2826
import kotlin.time.Duration.Companion.milliseconds
2927

3028
@OptIn(FlowPreview::class)
@@ -67,7 +65,7 @@ internal class InternalDatabaseImpl(
6765

6866
override suspend fun updateSchema(schemaJson: String) {
6967
withContext(dbContext) {
70-
runWrappedSuspending {
68+
runWrapped {
7169
// First get a lock on all read connections
7270
readPool.withAllConnections { readConnections ->
7371
// Then get access to the write connection
@@ -183,7 +181,7 @@ internal class InternalDatabaseImpl(
183181
*/
184182
private suspend fun <R> internalReadLock(callback: (TransactorDriver) -> R): R =
185183
withContext(dbContext) {
186-
runWrappedSuspending {
184+
runWrapped {
187185
readPool.withConnection {
188186
catchSwiftExceptions {
189187
callback(it)
@@ -295,7 +293,7 @@ internal class InternalDatabaseImpl(
295293
}
296294

297295
override suspend fun close() {
298-
runWrappedSuspending {
296+
runWrapped {
299297
writeConnection.driver.close()
300298
readPool.close()
301299
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.powersync.db
2+
3+
import com.powersync.PowerSyncException
4+
import io.kotest.assertions.throwables.shouldThrow
5+
import kotlinx.coroutines.CancellationException
6+
import kotlin.test.Test
7+
8+
class FunctionTest {
9+
@Test
10+
fun `runWrapped reports exceptions as powersync exception`() {
11+
shouldThrow<PowerSyncException> {
12+
runWrapped {
13+
error("test")
14+
}
15+
}
16+
}
17+
18+
@Test
19+
fun `runWrapped does not wrap cancellation exceptions`() {
20+
shouldThrow<CancellationException> {
21+
runWrapped {
22+
throw CancellationException("test")
23+
}
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)