Skip to content

Commit 73c69de

Browse files
committed
fix wal test
1 parent 2351cdc commit 73c69de

3 files changed

Lines changed: 48 additions & 36 deletions

File tree

lib/wal/src/main/kotlin/de/quati/pgen/wal/PgenWalEventListener.kt

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import de.quati.pgen.wal.ReplicaIdentity.Companion.toSql
88
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
1010
import kotlinx.coroutines.Job
11+
import kotlinx.coroutines.NonCancellable
1112
import kotlinx.coroutines.channels.BufferOverflow
1213
import kotlinx.coroutines.flow.MutableSharedFlow
1314
import kotlinx.coroutines.flow.SharedFlow
@@ -16,6 +17,7 @@ import kotlinx.coroutines.isActive
1617
import kotlinx.coroutines.launch
1718
import kotlinx.coroutines.sync.Mutex
1819
import kotlinx.coroutines.sync.withLock
20+
import kotlinx.coroutines.withContext
1921
import kotlinx.serialization.json.JsonObject
2022
import org.postgresql.PGConnection
2123
import org.postgresql.PGProperty
@@ -153,13 +155,19 @@ public class PgenWalEventListener private constructor(
153155
val currentJob = job ?: return@withLock
154156
currentJob.cancel()
155157
streamConnectionMutex.withLock {
156-
streamConnection?.unwrap(PGConnection::class.java)?.cancelQuery()
158+
runCatching {
159+
streamConnection?.unwrap(PGConnection::class.java)?.cancelQuery()
160+
}
157161
}
158-
currentJob.join()
159-
job = null
160-
streamConnectionMutex.withLock {
161-
streamConnection?.close()
162-
streamConnection = null
162+
withContext(NonCancellable) {
163+
currentJob.join()
164+
job = null
165+
streamConnectionMutex.withLock {
166+
runCatching {
167+
streamConnection?.close()
168+
}
169+
streamConnection = null
170+
}
163171
}
164172
}
165173
}

tests/jdbc-wal/src/test/kotlin/WalTest.kt

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,24 @@ class WalTest {
147147
action: suspend () -> Unit,
148148
): List<WalEvent.Change<*>> {
149149
this@testCollect.start(recreateSlot = recreateSlot)
150-
val readyFlag = ReadyFlag()
151-
val job = scope.async {
152-
this@testCollect.flow
153-
.onSubscription { readyFlag.markReady() }
154-
.takeWhile { (it as? WalEvent.Message)?.content != stopMsg }
155-
.map {
156-
(it as? WalEvent.Change<*>)
157-
?: error("Expected WalEvent.Change, got $it")
158-
}
159-
.toList()
150+
try {
151+
val readyFlag = ReadyFlag()
152+
val job = scope.async {
153+
this@testCollect.flow
154+
.onSubscription { readyFlag.markReady() }
155+
.takeWhile { (it as? WalEvent.Message)?.content != stopMsg }
156+
.map {
157+
(it as? WalEvent.Change<*>)
158+
?: error("Expected WalEvent.Change, got $it")
159+
}
160+
.toList()
161+
}
162+
readyFlag.awaitReady()
163+
action()
164+
return job.await()
165+
} finally {
166+
this@testCollect.stop()
160167
}
161-
readyFlag.awaitReady()
162-
action()
163-
val result = job.await()
164-
this@testCollect.stop()
165-
return result
166168
}
167169

168170
@Test

tests/r2dbc-wal/src/test/kotlin/WalTest.kt

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,24 @@ class WalTest {
147147
action: suspend () -> Unit,
148148
): List<WalEvent.Change<*>> {
149149
this@testCollect.start(recreateSlot = recreateSlot)
150-
val readyFlag = ReadyFlag()
151-
val job = scope.async {
152-
this@testCollect.flow
153-
.onSubscription { readyFlag.markReady() }
154-
.takeWhile { (it as? WalEvent.Message)?.content != stopMsg }
155-
.map {
156-
(it as? WalEvent.Change<*>)
157-
?: error("Expected WalEvent.Change, got $it")
158-
}
159-
.toList()
150+
try {
151+
val readyFlag = ReadyFlag()
152+
val job = scope.async {
153+
this@testCollect.flow
154+
.onSubscription { readyFlag.markReady() }
155+
.takeWhile { (it as? WalEvent.Message)?.content != stopMsg }
156+
.map {
157+
(it as? WalEvent.Change<*>)
158+
?: error("Expected WalEvent.Change, got $it")
159+
}
160+
.toList()
161+
}
162+
readyFlag.awaitReady()
163+
action()
164+
return job.await()
165+
} finally {
166+
this@testCollect.stop()
160167
}
161-
readyFlag.awaitReady()
162-
action()
163-
val result = job.await()
164-
this@testCollect.stop()
165-
return result
166168
}
167169

168170
@Test

0 commit comments

Comments
 (0)