Skip to content

Commit d9dc982

Browse files
committed
Fix client streams
1 parent 9931e2f commit d9dc982

File tree

11 files changed

+97
-62
lines changed

11 files changed

+97
-62
lines changed

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import kotlinx.atomicfu.atomic
88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.Channel
1010
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.flow.FlowCollector
1112
import kotlinx.coroutines.flow.first
1213
import kotlinx.coroutines.flow.flow
14+
import kotlinx.coroutines.sync.Mutex
15+
import kotlinx.coroutines.sync.withLock
1316
import kotlinx.rpc.RpcCall
1417
import kotlinx.rpc.RpcClient
1518
import kotlinx.rpc.annotations.Rpc
@@ -33,6 +36,7 @@ import kotlinx.serialization.modules.SerializersModule
3336
import kotlin.collections.first
3437
import kotlin.coroutines.CoroutineContext
3538
import kotlin.coroutines.cancellation.CancellationException
39+
import kotlin.time.Duration.Companion.seconds
3640

3741
@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
3842
public typealias KRPCClient = KrpcClient
@@ -58,7 +62,7 @@ public typealias KRPCClient = KrpcClient
5862
public abstract class KrpcClient(
5963
private val config: KrpcConfig.Client,
6064
transport: KrpcTransport,
61-
): RpcClient, KrpcEndpoint {
65+
) : RpcClient, KrpcEndpoint {
6266
// we make a child here, so we can send cancellation messages before closing the connection
6367
final override val coroutineContext: CoroutineContext = SupervisorJob(transport.coroutineContext.job)
6468

@@ -183,26 +187,22 @@ public abstract class KrpcClient(
183187
}
184188

185189
coroutineScope {
186-
launch {
190+
val clientStreamsJob = launch(CoroutineName("client-stream-root-${call.serviceId}-$callId")) {
187191
supervisorScope {
188192
clientStreamContext.streams[callId].orEmpty().forEach {
189-
launch {
193+
launch(CoroutineName("client-stream-${call.serviceId}-$callId-${it.streamId}")) {
190194
handleOutgoingStream(it, serialFormat, call.descriptor.fqName)
191195
}
192196
}
193197
}
198+
println("finished client streams job")
194199
}
195200

196-
while (true) {
197-
val element = channel.receiveCatching()
198-
if (element.isClosed) {
199-
val ex = element.exceptionOrNull() ?: break
200-
throw ex
201-
}
202-
203-
if (!element.isFailure) {
204-
emit(element.getOrThrow())
205-
}
201+
try {
202+
consumeAndEmitServerMessages(channel)
203+
} finally {
204+
clientStreamsJob.cancelAndJoin()
205+
clientStreamContext.streams.remove(callId)
206206
}
207207
}
208208
} catch (e: CancellationException) {
@@ -217,6 +217,20 @@ public abstract class KrpcClient(
217217
}
218218
}
219219

220+
private suspend fun <T> FlowCollector<T>.consumeAndEmitServerMessages(channel: Channel<T>) {
221+
while (true) {
222+
val element = channel.receiveCatching()
223+
if (element.isClosed) {
224+
val ex = element.exceptionOrNull() ?: break
225+
throw ex
226+
}
227+
228+
if (!element.isFailure) {
229+
emit(element.getOrThrow())
230+
}
231+
}
232+
}
233+
220234
private suspend fun <T, @Rpc R : Any> handleServerStreamingMessage(
221235
message: KrpcCallMessage,
222236
channel: Channel<T>,
@@ -388,6 +402,7 @@ public abstract class KrpcClient(
388402
outgoingStream: StreamCall,
389403
) {
390404
flow.collect {
405+
println("collected: $it, ${outgoingStream.streamId}")
391406
val message = when (serialFormat) {
392407
is StringFormat -> {
393408
val stringData = serialFormat.encodeToString(outgoingStream.elementSerializer, it)

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ internal class KrpcServerService<@Rpc T : Any>(
202202

203203
if (failure != null) {
204204
cancelRequest(callId, "Server request failed", failure, fromJob = true)
205+
} else {
206+
cancelRequest(callId, fromJob = true)
205207
}
206208
}
207209

@@ -218,9 +220,7 @@ internal class KrpcServerService<@Rpc T : Any>(
218220
) {
219221
val result = when (serialFormat) {
220222
is StringFormat -> {
221-
val stringValue = serverStreamContext.scoped(callData.callId) {
222-
serialFormat.encodeToString(returnSerializer, value)
223-
}
223+
val stringValue = serialFormat.encodeToString(returnSerializer, value)
224224
KrpcCallMessage.CallSuccessString(
225225
callId = callData.callId,
226226
serviceType = descriptor.fqName,
@@ -231,9 +231,7 @@ internal class KrpcServerService<@Rpc T : Any>(
231231
}
232232

233233
is BinaryFormat -> {
234-
val binaryValue = serverStreamContext.scoped(callData.callId) {
235-
serialFormat.encodeToByteArray(returnSerializer, value)
236-
}
234+
val binaryValue = serialFormat.encodeToByteArray(returnSerializer, value)
237235
KrpcCallMessage.CallSuccessBinary(
238236
callId = callData.callId,
239237
serviceType = descriptor.fqName,
@@ -261,9 +259,7 @@ internal class KrpcServerService<@Rpc T : Any>(
261259
flow.collect { value ->
262260
val result = when (serialFormat) {
263261
is StringFormat -> {
264-
val stringValue = serverStreamContext.scoped(callData.callId) {
265-
serialFormat.encodeToString(returnSerializer, value)
266-
}
262+
val stringValue = serialFormat.encodeToString(returnSerializer, value)
267263
KrpcCallMessage.StreamMessageString(
268264
callId = callData.callId,
269265
serviceType = descriptor.fqName,
@@ -275,9 +271,7 @@ internal class KrpcServerService<@Rpc T : Any>(
275271
}
276272

277273
is BinaryFormat -> {
278-
val binaryValue = serverStreamContext.scoped(callData.callId) {
279-
serialFormat.encodeToByteArray(returnSerializer, value)
280-
}
274+
val binaryValue = serialFormat.encodeToByteArray(returnSerializer, value)
281275
KrpcCallMessage.StreamMessageBinary(
282276
callId = callData.callId,
283277
serviceType = descriptor.fqName,
@@ -328,6 +322,7 @@ internal class KrpcServerService<@Rpc T : Any>(
328322
cause: Throwable? = null,
329323
fromJob: Boolean = false,
330324
) {
325+
serverStreamContext.removeCall(callId, cause)
331326
requestMap.remove(callId)?.cancelAndClose(callId, message, cause, fromJob)
332327

333328
// acknowledge the cancellation

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ internal class ServerStreamContext {
5555
fun prepareClientStream(streamId: String, elementKind: KSerializer<Any?>): Flow<Any?> {
5656
val callId = currentCallId ?: error("No call id")
5757

58-
val channel = Channel<Any?>()
58+
val channel = Channel<Any?>(Channel.UNLIMITED)
5959

6060
@Suppress("UNCHECKED_CAST")
6161
val map = streams.computeIfAbsent(callId) { RpcInternalConcurrentHashMap() }
@@ -68,6 +68,7 @@ internal class ServerStreamContext {
6868

6969
val flow = flow {
7070
for (message in channel) {
71+
println("Consumed on server: $message")
7172
when (message) {
7273
is StreamCancel -> {
7374
onClose()

krpc/krpc-test/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ kotlin {
2828
api(projects.krpc.krpcCore)
2929
api(projects.krpc.krpcServer)
3030
api(projects.krpc.krpcClient)
31+
api(projects.krpc.krpcLogging)
32+
33+
implementation(libs.coroutines.debug)
3134

3235
implementation(projects.krpc.krpcSerialization.krpcSerializationJson)
3336

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ interface KrpcTestService : RemoteService {
7171
): String
7272

7373
suspend fun incomingStreamSyncCollect(arg1: Flow<String>): Int
74-
suspend fun incomingStreamAsyncCollect(arg1: Flow<String>): Int
74+
suspend fun incomingStreamSyncCollectMultiple(arg1: Flow<String>, arg2: Flow<String>, arg3: Flow<String>): Int
7575
fun outgoingStream(): Flow<String>
7676
fun bidirectionalStream(arg1: Flow<String>): Flow<String>
7777
fun echoStream(arg1: Flow<Int>): Flow<Int>

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.kt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,12 @@ class KrpcTestServiceBackend(override val coroutineContext: CoroutineContext) :
129129
return arg1.count()
130130
}
131131

132-
val incomingStreamAsyncCollectLatch = CompletableDeferred<Unit>()
133-
134-
@OptIn(DelicateCoroutinesApi::class)
135-
override suspend fun incomingStreamAsyncCollect(arg1: Flow<String>): Int {
136-
@Suppress("detekt.GlobalCoroutineUsage")
137-
GlobalScope.launch {
138-
assertContentEquals(listOf("test1", "test2", "test3"), arg1.toList())
139-
incomingStreamAsyncCollectLatch.complete(Unit)
140-
}
141-
return 5
132+
override suspend fun incomingStreamSyncCollectMultiple(
133+
arg1: Flow<String>,
134+
arg2: Flow<String>,
135+
arg3: Flow<String>,
136+
): Int {
137+
return arg1.count() + arg2.count() + arg3.count()
142138
}
143139

144140
override fun outgoingStream(): Flow<String> {
@@ -238,3 +234,5 @@ class KrpcTestServiceBackend(override val coroutineContext: CoroutineContext) :
238234
}
239235

240236
internal expect fun runThreadIfPossible(runner: () -> Unit)
237+
238+
internal expect fun CoroutineScope.debugCoroutines()

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ abstract class KrpcTransportTestBase {
132132
expected = List(10) { it * 2 },
133133
actual = client.nonSuspendBidirectional(List(10) { it }.asFlow()).toList(),
134134
)
135-
print(1)
136135
}
137136

138137
@Test
@@ -259,13 +258,14 @@ abstract class KrpcTransportTestBase {
259258
}
260259

261260
@Test
262-
@Ignore // todo async vs sync client streaming
263-
fun incomingStreamAsyncCollect() = runTest {
264-
val result = client.incomingStreamAsyncCollect(flowOf("test1", "test2", "test3")).also {
265-
server.incomingStreamAsyncCollectLatch.await()
266-
}
261+
fun incomingStreamSyncCollectMultiple() = runTest {
262+
val result = client.incomingStreamSyncCollectMultiple(
263+
flowOf("test1", "test2", "test3"),
264+
flowOf("test1", "test2", "test3"),
265+
flowOf("test1", "test2", "test3"),
266+
)
267267

268-
assertEquals(5, result)
268+
assertEquals(9, result)
269269
}
270270

271271
@Test
@@ -290,20 +290,9 @@ abstract class KrpcTransportTestBase {
290290
}
291291

292292
@Test
293-
@Ignore // todo async vs sync client streaming
294-
fun bidirectionalAsyncStream() = runTest {
295-
val flow = MutableSharedFlow<Int>(1)
296-
val result = client.echoStream(flow.take(10))
297-
launch {
298-
var id = 0
299-
result.collect {
300-
assertEquals(id, it)
301-
id++
302-
flow.emit(id)
303-
}
304-
}
305-
306-
flow.emit(0)
293+
fun bidirectionalEchoStream() = runTest {
294+
val result = client.echoStream(flowOf(1, 2, 3)).toList().sum()
295+
assertEquals(6, result)
307296
}
308297

309298
@Test
@@ -331,15 +320,15 @@ abstract class KrpcTransportTestBase {
331320
try {
332321
client.throwsIllegalArgument("me")
333322
fail("Exception expected: throwsIllegalArgument")
334-
} catch (e : AssertionError) {
323+
} catch (e: AssertionError) {
335324
throw e
336325
} catch (e: Throwable) {
337326
assertEquals("me", e.message)
338327
}
339328
try {
340329
client.throwsSerializableWithMessageAndCause("me")
341330
fail("Exception expected: throwsSerializableWithMessageAndCause")
342-
} catch (e : AssertionError) {
331+
} catch (e: AssertionError) {
343332
throw e
344333
} catch (e: Throwable) {
345334
assertEquals("me", e.message)
@@ -348,15 +337,15 @@ abstract class KrpcTransportTestBase {
348337
try {
349338
client.throwsThrowable("me")
350339
fail("Exception expected: throwsThrowable")
351-
} catch (e : AssertionError) {
340+
} catch (e: AssertionError) {
352341
throw e
353342
} catch (e: Throwable) {
354343
assertEquals("me", e.message)
355344
}
356345
try {
357346
client.throwsUNSTOPPABLEThrowable("me")
358347
fail("Exception expected: throwsUNSTOPPABLEThrowable")
359-
} catch (e : AssertionError) {
348+
} catch (e: AssertionError) {
360349
throw e
361350
} catch (e: Throwable) {
362351
assertEquals("me", e.message)

krpc/krpc-test/src/jsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.js.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
package kotlinx.rpc.krpc.test
66

7+
import kotlinx.coroutines.CoroutineScope
8+
79
actual inline fun runThreadIfPossible(runner: () -> Unit) {
810
runner()
911
}
12+
13+
internal actual fun CoroutineScope.debugCoroutines() {
14+
}

krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.jvm.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,26 @@
44

55
package kotlinx.rpc.krpc.test
66

7+
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.coroutines.Dispatchers
9+
import kotlinx.coroutines.ExperimentalCoroutinesApi
10+
import kotlinx.coroutines.debug.DebugProbes
11+
import kotlinx.coroutines.delay
12+
import kotlinx.coroutines.launch
13+
import kotlinx.coroutines.withContext
14+
import kotlin.time.Duration.Companion.seconds
15+
716
actual fun runThreadIfPossible(runner: () -> Unit) {
817
Thread(runner).start()
918
}
19+
20+
@OptIn(ExperimentalCoroutinesApi::class)
21+
internal actual fun CoroutineScope.debugCoroutines() {
22+
DebugProbes.install()
23+
launch {
24+
withContext(Dispatchers.IO) {
25+
delay(10.seconds)
26+
}
27+
DebugProbes.dumpCoroutines()
28+
}
29+
}

krpc/krpc-test/src/nativeMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.native.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44

55
package kotlinx.rpc.krpc.test
66

7+
import kotlinx.coroutines.CoroutineScope
78
import kotlin.native.concurrent.ObsoleteWorkersApi
89
import kotlin.native.concurrent.Worker
910

1011
@OptIn(ObsoleteWorkersApi::class)
1112
actual fun runThreadIfPossible(runner: () -> Unit) {
1213
Worker.start(errorReporting = true).executeAfter(0L, runner)
1314
}
15+
16+
internal actual fun CoroutineScope.debugCoroutines() {
17+
}

krpc/krpc-test/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestServiceBackend.wasmJs.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
package kotlinx.rpc.krpc.test
66

7+
import kotlinx.coroutines.CoroutineScope
8+
79
actual inline fun runThreadIfPossible(runner: () -> Unit) {
810
runner()
911
}
12+
13+
internal actual fun CoroutineScope.debugCoroutines() {
14+
}

0 commit comments

Comments
 (0)