Skip to content

Commit d306593

Browse files
feat: ability to provide custom dispatch implementations
1 parent c258569 commit d306593

File tree

7 files changed

+278
-134
lines changed

7 files changed

+278
-134
lines changed

CHANGELOG.md

Lines changed: 135 additions & 120 deletions
Large diffs are not rendered by default.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.powersync
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.IO
6+
import kotlinx.coroutines.withContext
7+
8+
/**
9+
* Function interface for dispatching database operations to a specific coroutine context.
10+
*
11+
* By default, operations are dispatched to [Dispatchers.IO]. Custom implementations
12+
* can be provided to control the execution context of database operations.
13+
*
14+
* This interface supports the `operator invoke` syntax, allowing you to call it like:
15+
* ```
16+
* dispatchFunction { /* your code */ }
17+
* ```
18+
*
19+
* **Design Note:** This must be an interface (not a function type) because Kotlin does not
20+
* support function types with generic type parameters. Since the dispatch function needs to
21+
* accept and return generic types `<R>`, an interface with an `operator invoke` method is
22+
* the appropriate solution. This allows the same convenient syntax as function types while
23+
* supporting generics.
24+
*
25+
* @see DispatchStrategy for dispatch strategy options
26+
*/
27+
public interface DispatchFunction {
28+
/**
29+
* Dispatches the given block to the appropriate coroutine context.
30+
*
31+
* @param block The suspend function to execute in the dispatch context.
32+
* @return The result of executing the block.
33+
*/
34+
public suspend operator fun <R> invoke(block: suspend () -> R): R
35+
}
36+
37+
/**
38+
* Strategy for dispatching database operations to a specific coroutine context.
39+
*
40+
* This sealed class allows you to specify how database operations should be dispatched:
41+
* - [Default]: Use the default dispatcher ([Dispatchers.IO])
42+
* - [Dispatcher]: Use a specific [CoroutineDispatcher]
43+
* - [Custom]: Use a custom [DispatchFunction] for full control
44+
*
45+
* Each variant provides a [dispatchFunction] that implements the actual dispatching logic.
46+
*
47+
* Example usage:
48+
* ```
49+
* // Use default (Dispatchers.IO) - this is the default if not specified
50+
* PowerSyncDatabase(factory, schema)
51+
* // or explicitly:
52+
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Default)
53+
*
54+
* // Use a specific dispatcher
55+
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Dispatcher(Dispatchers.Default))
56+
*
57+
* // Use a custom function
58+
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Custom(myCustomFunction))
59+
* ```
60+
*
61+
* @see DispatchFunction for the dispatch function interface
62+
*/
63+
public sealed class DispatchStrategy {
64+
/**
65+
* Returns the [DispatchFunction] that implements the dispatching logic for this strategy.
66+
*/
67+
public abstract val dispatchFunction: DispatchFunction
68+
69+
/**
70+
* Use the default dispatcher ([Dispatchers.IO]) for database operations.
71+
*
72+
* This is the recommended default for most use cases, as it provides
73+
* a dedicated thread pool for I/O-bound operations.
74+
*/
75+
public object Default : DispatchStrategy() {
76+
override val dispatchFunction: DispatchFunction =
77+
Dispatcher(Dispatchers.IO).dispatchFunction
78+
}
79+
80+
/**
81+
* Use a specific [CoroutineDispatcher] for database operations.
82+
*
83+
* This allows you to use any coroutine dispatcher, such as:
84+
* - [Dispatchers.Default] for CPU-bound work
85+
* - [Dispatchers.Main] for UI operations
86+
* - A custom dispatcher for your specific needs
87+
*
88+
* @property dispatcher The coroutine dispatcher to use.
89+
*/
90+
public data class Dispatcher(
91+
val dispatcher: CoroutineDispatcher,
92+
) : DispatchStrategy() {
93+
override val dispatchFunction: DispatchFunction =
94+
object : DispatchFunction {
95+
override suspend fun <R> invoke(block: suspend () -> R): R = withContext(dispatcher) { block() }
96+
}
97+
}
98+
99+
/**
100+
* Use a custom [DispatchFunction] for full control over dispatching.
101+
*
102+
* @property function The custom dispatch function to use.
103+
*/
104+
public data class Custom(
105+
val function: DispatchFunction,
106+
) : DispatchStrategy() {
107+
override val dispatchFunction: DispatchFunction = function
108+
}
109+
}

common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,10 @@ public interface PowerSyncDatabase : Queries {
252252
schema: Schema,
253253
identifier: String,
254254
logger: Logger,
255+
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
255256
): PowerSyncDatabase {
256257
val group = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
257-
return openedWithGroup(pool, scope, schema, logger, group)
258+
return openedWithGroup(pool, scope, schema, logger, group, dispatchStrategy)
258259
}
259260

260261
/**
@@ -268,18 +269,21 @@ public interface PowerSyncDatabase : Queries {
268269
schema: Schema,
269270
scope: CoroutineScope,
270271
logger: Logger? = null,
272+
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
271273
): PowerSyncDatabase {
272274
val logger = generateLogger(logger)
273275
// Since this returns a fresh in-memory database every time, use a fresh group to avoid warnings about the
274276
// same database being opened multiple times.
275-
val collection = ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test")
277+
val collection =
278+
ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test")
276279

277280
return openedWithGroup(
278281
SingleConnectionPool(factory.openInMemoryConnection()),
279282
scope,
280283
schema,
281284
logger,
282285
collection,
286+
dispatchStrategy,
283287
)
284288
}
285289

@@ -289,13 +293,15 @@ public interface PowerSyncDatabase : Queries {
289293
schema: Schema,
290294
logger: Logger,
291295
group: Pair<ActiveDatabaseResource, Any>,
296+
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
292297
): PowerSyncDatabase =
293298
PowerSyncDatabaseImpl(
294299
schema,
295300
scope,
296301
pool,
297302
logger,
298303
group,
304+
dispatchStrategy,
299305
)
300306
}
301307
}

common/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public fun PowerSyncDatabase(
2525
dbFilename: String = DEFAULT_DB_FILENAME,
2626
scope: CoroutineScope = GlobalScope,
2727
logger: Logger? = null,
28+
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
2829
/**
2930
* Optional database file directory path.
3031
* This parameter is ignored for iOS.
@@ -40,17 +41,18 @@ public fun PowerSyncDatabase(
4041
scope = scope,
4142
logger = generatedLogger,
4243
dbDirectory = dbDirectory,
44+
dispatchStrategy = dispatchStrategy,
4345
)
4446
}
4547

46-
@OptIn(ExperimentalPowerSyncAPI::class)
4748
internal fun createPowerSyncDatabaseImpl(
4849
factory: PersistentConnectionFactory,
4950
schema: Schema,
5051
dbFilename: String,
5152
scope: CoroutineScope,
5253
logger: Logger,
5354
dbDirectory: String?,
55+
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
5456
): PowerSyncDatabaseImpl {
5557
val identifier = dbDirectory + dbFilename
5658
val activeDatabaseGroup = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
@@ -72,5 +74,6 @@ internal fun createPowerSyncDatabaseImpl(
7274
schema,
7375
logger,
7476
activeDatabaseGroup,
77+
dispatchStrategy,
7578
) as PowerSyncDatabaseImpl
7679
}

common/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.powersync.db
22

33
import co.touchlab.kermit.Logger
4+
import com.powersync.DispatchStrategy
45
import com.powersync.ExperimentalPowerSyncAPI
56
import com.powersync.PowerSyncDatabase
67
import com.powersync.PowerSyncException
@@ -61,6 +62,7 @@ internal class PowerSyncDatabaseImpl(
6162
pool: SQLiteConnectionPool,
6263
val logger: Logger,
6364
private val activeDatabaseGroup: Pair<ActiveDatabaseResource, Any>,
65+
dispatchStrategy: DispatchStrategy,
6466
) : PowerSyncDatabase {
6567
companion object {
6668
internal val streamConflictMessage =
@@ -79,7 +81,7 @@ internal class PowerSyncDatabaseImpl(
7981
private val resource = activeDatabaseGroup.first
8082
private val streams = StreamTracker(this)
8183

82-
private val internalDb = InternalDatabaseImpl(pool, logger)
84+
private val internalDb = InternalDatabaseImpl(pool, logger, dispatchStrategy = dispatchStrategy)
8385

8486
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
8587

@@ -391,7 +393,7 @@ internal class PowerSyncDatabaseImpl(
391393

392394
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R {
393395
waitReady()
394-
return internalDb.writeTransaction(callback)
396+
return internalDb.readTransaction(callback)
395397
}
396398

397399
override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R {

common/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.powersync.db.internal
22

33
import co.touchlab.kermit.Logger
4+
import com.powersync.DispatchFunction
5+
import com.powersync.DispatchStrategy
46
import com.powersync.ExperimentalPowerSyncAPI
57
import com.powersync.db.SqlCursor
68
import com.powersync.db.ThrowableLockCallback
@@ -11,26 +13,21 @@ import com.powersync.db.runWrapped
1113
import com.powersync.utils.AtomicMutableSet
1214
import com.powersync.utils.JsonUtil
1315
import com.powersync.utils.throttle
14-
import kotlinx.coroutines.Dispatchers
15-
import kotlinx.coroutines.IO
1616
import kotlinx.coroutines.flow.Flow
1717
import kotlinx.coroutines.flow.SharedFlow
1818
import kotlinx.coroutines.flow.emitAll
1919
import kotlinx.coroutines.flow.flow
2020
import kotlinx.coroutines.flow.map
2121
import kotlinx.coroutines.flow.onSubscription
2222
import kotlinx.coroutines.flow.transform
23-
import kotlinx.coroutines.withContext
2423
import kotlin.time.Duration.Companion.milliseconds
2524

2625
@OptIn(ExperimentalPowerSyncAPI::class)
2726
internal class InternalDatabaseImpl(
2827
private val pool: SQLiteConnectionPool,
2928
private val logger: Logger,
29+
dispatchStrategy: DispatchStrategy,
3030
) : InternalDatabase {
31-
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
32-
private val dbContext = Dispatchers.IO
33-
3431
override suspend fun execute(
3532
sql: String,
3633
parameters: List<Any?>?,
@@ -39,8 +36,10 @@ internal class InternalDatabaseImpl(
3936
context.execute(sql, parameters)
4037
}
4138

39+
private val dispatch: DispatchFunction = dispatchStrategy.dispatchFunction
40+
4241
override suspend fun updateSchema(schemaJson: String) {
43-
withContext(dbContext) {
42+
dispatch {
4443
runWrapped {
4544
pool.withAllConnections { writer, readers ->
4645
writer.runTransaction { tx ->
@@ -167,7 +166,7 @@ internal class InternalDatabaseImpl(
167166
*/
168167
@OptIn(ExperimentalPowerSyncAPI::class)
169168
private suspend fun <R> internalReadLock(callback: suspend (SQLiteConnectionLease) -> R): R =
170-
withContext(dbContext) {
169+
dispatch {
171170
runWrapped {
172171
useConnection(true) { connection ->
173172
callback(connection)
@@ -189,7 +188,7 @@ internal class InternalDatabaseImpl(
189188

190189
@OptIn(ExperimentalPowerSyncAPI::class)
191190
private suspend fun <R> internalWriteLock(callback: suspend (SQLiteConnectionLease) -> R): R =
192-
withContext(dbContext) {
191+
dispatch {
193192
pool.write { writer ->
194193
runWrapped {
195194
callback(writer)

internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/pool/SwiftSQLiteConnectionPool.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.powersync.pool
22

33
import co.touchlab.kermit.Logger
4+
import com.powersync.DispatchFunction
5+
import com.powersync.DispatchStrategy
46
import com.powersync.PowerSyncDatabase
57
import com.powersync.db.driver.SQLiteConnectionLease
68
import com.powersync.db.driver.SQLiteConnectionPool
@@ -104,4 +106,12 @@ public fun openPowerSyncWithPool(
104106
schema = schema,
105107
identifier = identifier,
106108
logger = logger,
109+
dispatchStrategy = DispatchStrategy.Custom(
110+
object : DispatchFunction {
111+
override suspend fun <R> invoke(block: suspend () -> R): R {
112+
// We leave the dispatching up to the pool
113+
return block()
114+
}
115+
},
116+
),
107117
)

0 commit comments

Comments
 (0)