Skip to content

Commit 7f58185

Browse files
authored
Refactor fetching credentials (#220)
1 parent 6cb4834 commit 7f58185

File tree

7 files changed

+127
-61
lines changed

7 files changed

+127
-61
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* Support tables created outside of PowerSync with the `RawTable` API.
66
For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables).
77
* Fix `runWrapped` catching cancellation exceptions.
8+
* Fix errors in `PowerSyncBackendConnector.fetchCredentials()` crashing Android apps.
89

910
## 1.2.2
1011

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import com.powersync.bucket.OpType
1313
import com.powersync.bucket.OplogEntry
1414
import com.powersync.bucket.WriteCheckpointData
1515
import com.powersync.bucket.WriteCheckpointResponse
16+
import com.powersync.connectors.PowerSyncBackendConnector
17+
import com.powersync.connectors.PowerSyncCredentials
1618
import com.powersync.db.PowerSyncDatabaseImpl
1719
import com.powersync.db.schema.PendingStatement
1820
import com.powersync.db.schema.PendingStatementParameter
@@ -23,24 +25,20 @@ import com.powersync.testutils.databaseTest
2325
import com.powersync.testutils.waitFor
2426
import com.powersync.utils.JsonParam
2527
import com.powersync.utils.JsonUtil
26-
import dev.mokkery.answering.returns
27-
import dev.mokkery.every
28-
import dev.mokkery.verify
29-
import dev.mokkery.verifyNoMoreCalls
30-
import dev.mokkery.verifySuspend
3128
import io.kotest.matchers.collections.shouldHaveSingleElement
3229
import io.kotest.matchers.collections.shouldHaveSize
3330
import io.kotest.matchers.shouldBe
31+
import io.kotest.matchers.shouldNotBe
32+
import io.kotest.matchers.string.shouldContain
3433
import kotlinx.coroutines.CompletableDeferred
3534
import kotlinx.coroutines.DelicateCoroutinesApi
36-
import kotlinx.coroutines.launch
37-
import kotlinx.serialization.encodeToString
3835
import kotlinx.serialization.json.jsonObject
3936
import kotlinx.serialization.json.jsonPrimitive
4037
import kotlin.test.Test
4138
import kotlin.test.assertEquals
4239
import kotlin.test.assertFailsWith
4340
import kotlin.test.assertNotNull
41+
import kotlin.test.fail
4442
import kotlin.time.Duration.Companion.seconds
4543

4644
@OptIn(LegacySyncImplementation::class)
@@ -116,6 +114,7 @@ abstract class BaseSyncIntegrationTest(
116114
turbineScope(timeout = 10.0.seconds) {
117115
val turbine = database.currentStatus.asFlow().testIn(this)
118116
turbine.waitFor { it.connected }
117+
connector.cachedCredentials shouldNotBe null
119118

120119
database.disconnect()
121120
turbine.waitFor { !it.connected }
@@ -126,7 +125,7 @@ abstract class BaseSyncIntegrationTest(
126125
waitFor { syncLines.isClosedForSend shouldBe true }
127126

128127
// And called invalidateCredentials on the connector
129-
verify { connector.invalidateCredentials() }
128+
connector.cachedCredentials shouldBe null
130129
}
131130

132131
@Test
@@ -630,6 +629,12 @@ abstract class BaseSyncIntegrationTest(
630629
@Test
631630
fun testTokenExpired() =
632631
databaseTest {
632+
var fetchCredentialsCalls = 0
633+
connector.fetchCredentialsCallback = {
634+
fetchCredentialsCalls++
635+
TestConnector.testCredentials
636+
}
637+
633638
turbineScope(timeout = 10.0.seconds) {
634639
val turbine = database.currentStatus.asFlow().testIn(this)
635640

@@ -638,13 +643,48 @@ abstract class BaseSyncIntegrationTest(
638643

639644
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
640645
turbine.waitFor { it.connected }
641-
verifySuspend { connector.getCredentialsCached() }
642-
verifyNoMoreCalls(connector)
646+
fetchCredentialsCalls shouldBe 1
643647

644648
// Should invalidate credentials when token expires
645649
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0))
646650
turbine.waitFor { !it.connected }
647-
verify { connector.invalidateCredentials() }
651+
connector.cachedCredentials shouldBe null
652+
653+
turbine.cancel()
654+
}
655+
}
656+
657+
@Test
658+
fun testTokenThrows() =
659+
databaseTest {
660+
// Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/219
661+
var attempt = 0
662+
val connector =
663+
object : PowerSyncBackendConnector() {
664+
override suspend fun fetchCredentials(): PowerSyncCredentials? {
665+
attempt++
666+
if (attempt == 1) {
667+
fail("Expected exception from fetchCredentials")
668+
}
669+
670+
return TestConnector.testCredentials
671+
}
672+
673+
override suspend fun uploadData(database: PowerSyncDatabase) {
674+
fail("Not implemented: uploadData")
675+
}
676+
}
677+
678+
turbineScope(timeout = 10.0.seconds) {
679+
val turbine = database.currentStatus.asFlow().testIn(this)
680+
681+
database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
682+
turbine.waitFor { it.downloadError != null }
683+
684+
database.currentStatus.downloadError?.toString() shouldContain "Expected exception from fetchCredentials"
685+
686+
// Should retry, and the second fetchCredentials call will work
687+
turbine.waitFor { it.connected }
648688

649689
turbine.cancel()
650690
}
@@ -662,10 +702,21 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
662702
databaseTest {
663703
val prefetchCalled = CompletableDeferred<Unit>()
664704
val completePrefetch = CompletableDeferred<Unit>()
665-
every { connector.prefetchCredentials() } returns
666-
scope.launch {
667-
prefetchCalled.complete(Unit)
668-
completePrefetch.await()
705+
var fetchCredentialsCount = 0
706+
707+
val connector =
708+
object : PowerSyncBackendConnector() {
709+
override suspend fun fetchCredentials(): PowerSyncCredentials? {
710+
fetchCredentialsCount++
711+
if (fetchCredentialsCount == 2) {
712+
prefetchCalled.complete(Unit)
713+
completePrefetch.await()
714+
}
715+
716+
return TestConnector.testCredentials
717+
}
718+
719+
override suspend fun uploadData(database: PowerSyncDatabase) {}
669720
}
670721

671722
turbineScope(timeout = 10.0.seconds) {
@@ -676,8 +727,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
676727

677728
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
678729
turbine.waitFor { it.connected }
679-
verifySuspend { connector.getCredentialsCached() }
680-
verifyNoMoreCalls(connector)
730+
fetchCredentialsCount shouldBe 1
681731

682732
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
683733
prefetchCalled.await()
@@ -689,6 +739,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
689739
turbine.waitFor { !it.connected }
690740

691741
turbine.waitFor { it.connected }
742+
fetchCredentialsCount shouldBe 2
692743
turbine.cancel()
693744
}
694745
}

core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,15 @@ import co.touchlab.kermit.Severity
99
import co.touchlab.kermit.TestConfig
1010
import co.touchlab.kermit.TestLogWriter
1111
import com.powersync.DatabaseDriverFactory
12+
import com.powersync.TestConnector
1213
import com.powersync.bucket.WriteCheckpointData
1314
import com.powersync.bucket.WriteCheckpointResponse
14-
import com.powersync.connectors.PowerSyncBackendConnector
15-
import com.powersync.connectors.PowerSyncCredentials
1615
import com.powersync.createPowerSyncDatabaseImpl
1716
import com.powersync.db.PowerSyncDatabaseImpl
1817
import com.powersync.db.schema.Schema
1918
import com.powersync.sync.LegacySyncImplementation
2019
import com.powersync.sync.SyncLine
2120
import com.powersync.utils.JsonUtil
22-
import dev.mokkery.answering.returns
23-
import dev.mokkery.everySuspend
24-
import dev.mokkery.mock
2521
import io.ktor.client.HttpClient
2622
import io.ktor.client.HttpClientConfig
2723
import io.ktor.client.engine.mock.toByteArray
@@ -103,16 +99,7 @@ internal class ActiveDatabaseTest(
10399
"db-$suffix"
104100
}
105101

106-
var connector =
107-
mock<PowerSyncBackendConnector> {
108-
everySuspend { getCredentialsCached() } returns
109-
PowerSyncCredentials(
110-
token = "test-token",
111-
endpoint = "https://test.com",
112-
)
113-
114-
everySuspend { invalidateCredentials() } returns Unit
115-
}
102+
var connector = TestConnector()
116103

117104
fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl {
118105
logger.d { "Opening database $databaseName in directory $testDirectory" }

core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import kotlinx.coroutines.Dispatchers
88
import kotlinx.coroutines.Job
99
import kotlinx.coroutines.SupervisorJob
1010
import kotlinx.coroutines.launch
11+
import kotlinx.coroutines.sync.Mutex
12+
import kotlinx.coroutines.sync.withLock
1113
import kotlin.coroutines.cancellation.CancellationException
1214

1315
/**
@@ -19,10 +21,17 @@ import kotlin.coroutines.cancellation.CancellationException
1921
*
2022
*/
2123
public abstract class PowerSyncBackendConnector {
22-
private var cachedCredentials: PowerSyncCredentials? = null
24+
internal var cachedCredentials: PowerSyncCredentials? = null
25+
private var fetchingCredentials = Mutex()
26+
2327
private var fetchRequest: Job? = null
2428
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
2529

30+
private suspend fun fetchAndCacheCredentials(): PowerSyncCredentials? =
31+
fetchCredentials().also {
32+
cachedCredentials = it
33+
}
34+
2635
/**
2736
* Get credentials current cached, or fetch new credentials if none are
2837
* available.
@@ -33,8 +42,15 @@ public abstract class PowerSyncBackendConnector {
3342
public open suspend fun getCredentialsCached(): PowerSyncCredentials? {
3443
return runWrapped {
3544
cachedCredentials?.let { return@runWrapped it }
36-
prefetchCredentials().join()
37-
cachedCredentials
45+
46+
return fetchingCredentials.withLock {
47+
// With concurrent calls, it's possible that credentials have just been fetched.
48+
cachedCredentials?.let { return it }
49+
50+
val credentials = fetchAndCacheCredentials()
51+
check(credentials === cachedCredentials)
52+
credentials
53+
}
3854
}
3955
}
4056

@@ -55,22 +71,39 @@ public abstract class PowerSyncBackendConnector {
5571
*
5672
* This may be called before the current credentials have expired.
5773
*/
58-
@Throws(PowerSyncException::class, CancellationException::class)
74+
@Deprecated(
75+
"Call updateCredentials, bring your own CoroutineScope if you need it to be asynchronous",
76+
replaceWith = ReplaceWith("updateCredentials"),
77+
)
5978
public open fun prefetchCredentials(): Job {
6079
fetchRequest?.takeIf { it.isActive }?.let { return it }
6180

6281
val request =
6382
scope.launch {
64-
fetchCredentials().also { value ->
65-
cachedCredentials = value
66-
fetchRequest = null
67-
}
83+
fetchAndCacheCredentials().also { fetchRequest = null }
6884
}
6985

7086
fetchRequest = request
7187
return request
7288
}
7389

90+
/**
91+
* If no other task is currently fetching credentials, calls [fetchCredentials] again and caches
92+
* the result internally.
93+
*
94+
* This is used by the sync client if a token is about to expire: By fetching a new token early,
95+
* we can avoid interruptions in the sync process.
96+
*/
97+
public suspend fun updateCredentials() {
98+
if (fetchingCredentials.tryLock()) {
99+
try {
100+
fetchAndCacheCredentials()
101+
} finally {
102+
fetchingCredentials.unlock()
103+
}
104+
}
105+
}
106+
74107
/**
75108
* Get credentials for PowerSync.
76109
*

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ internal class SyncStream(
400400
if (credentialsInvalidation == null) {
401401
val job =
402402
scope.launch {
403-
connector.prefetchCredentials().join()
403+
connector.updateCredentials()
404404
logger.v { "Stopping because new credentials are available" }
405405

406406
// Token has been refreshed, start another iteration

core/src/commonTest/kotlin/com/powersync/TestConnector.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import com.powersync.connectors.PowerSyncCredentials
55

66
class TestConnector : PowerSyncBackendConnector() {
77
var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = {
8-
PowerSyncCredentials(
9-
token = "test-token",
10-
endpoint = "https://test.com",
11-
)
8+
testCredentials
129
}
1310
var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = {
1411
val tx = it.getNextCrudTransaction()
@@ -20,4 +17,12 @@ class TestConnector : PowerSyncBackendConnector() {
2017
override suspend fun uploadData(database: PowerSyncDatabase) {
2118
uploadDataCallback(database)
2219
}
20+
21+
companion object {
22+
val testCredentials =
23+
PowerSyncCredentials(
24+
token = "test-token",
25+
endpoint = "https://powersynctest.example.com",
26+
)
27+
}
2328
}

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import co.touchlab.kermit.Severity
66
import co.touchlab.kermit.TestConfig
77
import co.touchlab.kermit.TestLogWriter
88
import com.powersync.ExperimentalPowerSyncAPI
9+
import com.powersync.TestConnector
910
import com.powersync.bucket.BucketStorage
1011
import com.powersync.connectors.PowerSyncBackendConnector
11-
import com.powersync.connectors.PowerSyncCredentials
1212
import com.powersync.db.crud.CrudEntry
1313
import com.powersync.db.crud.UpdateType
1414
import com.powersync.db.schema.Schema
1515
import dev.mokkery.answering.returns
1616
import dev.mokkery.everySuspend
1717
import dev.mokkery.mock
18-
import dev.mokkery.verify
18+
import io.kotest.matchers.shouldBe
1919
import io.ktor.client.HttpClient
2020
import io.ktor.client.engine.mock.MockEngine
2121
import kotlinx.coroutines.delay
@@ -55,24 +55,12 @@ class SyncStreamTest {
5555
mock<BucketStorage> {
5656
everySuspend { getClientId() } returns "test-client-id"
5757
}
58-
connector =
59-
mock<PowerSyncBackendConnector> {
60-
everySuspend { getCredentialsCached() } returns
61-
PowerSyncCredentials(
62-
token = "test-token",
63-
endpoint = "https://test.com",
64-
)
65-
}
58+
connector = TestConnector()
6659
}
6760

6861
@Test
6962
fun testInvalidateCredentials() =
7063
runTest {
71-
connector =
72-
mock<PowerSyncBackendConnector> {
73-
everySuspend { invalidateCredentials() } returns Unit
74-
}
75-
7664
syncStream =
7765
SyncStream(
7866
bucketStorage = bucketStorage,
@@ -86,8 +74,9 @@ class SyncStreamTest {
8674
schema = Schema(),
8775
)
8876

77+
connector.cachedCredentials = TestConnector.testCredentials
8978
syncStream.invalidateCredentials()
90-
verify { connector.invalidateCredentials() }
79+
connector.cachedCredentials shouldBe null
9180
}
9281

9382
// TODO: Work on improving testing this without needing to test the logs are displayed

0 commit comments

Comments
 (0)