Skip to content

Commit 1ed1633

Browse files
committed
Experimental support for: KRPC-125 Manual stream scope management
1 parent b946964 commit 1ed1633

File tree

2 files changed

+113
-17
lines changed

2 files changed

+113
-17
lines changed

krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/StreamScope.kt

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import kotlin.contracts.InvocationKind
1515
import kotlin.contracts.contract
1616
import kotlin.coroutines.CoroutineContext
1717
import kotlin.coroutines.coroutineContext
18+
import kotlin.js.JsName
1819

1920
/**
2021
* Stream scope handles all RPC streams that are launched inside it.
@@ -26,33 +27,46 @@ import kotlin.coroutines.coroutineContext
2627
* Stream scope is a child of the [CoroutineContext] it was created in.
2728
* Failure of one request will not cancel all streams in the others.
2829
*/
29-
@InternalRPCApi
3030
@OptIn(InternalCoroutinesApi::class)
31-
public class StreamScope(
31+
public class StreamScope internal constructor(
3232
parentContext: CoroutineContext,
3333
internal val role: Role,
34-
) : CoroutineContext.Element, AutoCloseable {
35-
internal companion object Key : CoroutineContext.Key<StreamScope>
34+
): AutoCloseable {
35+
internal class Element(internal val scope: StreamScope) : CoroutineContext.Element {
36+
override val key: CoroutineContext.Key<Element> = Key
37+
38+
internal companion object Key : CoroutineContext.Key<Element>
39+
}
3640

37-
override val key: CoroutineContext.Key<StreamScope> = Key
41+
internal val contextElement = Element(this)
3842

3943
private val scopeJob = SupervisorJob(parentContext.job)
4044

4145
private val requests = ConcurrentHashMap<String, CoroutineScope>()
4246

47+
init {
48+
scopeJob.invokeOnCompletion {
49+
close()
50+
}
51+
}
52+
53+
@InternalRPCApi
4354
public fun onScopeCompletion(handler: (Throwable?) -> Unit) {
4455
scopeJob.invokeOnCompletion(handler)
4556
}
4657

58+
@InternalRPCApi
4759
public fun onScopeCompletion(callId: String, handler: (Throwable?) -> Unit) {
4860
getRequestScope(callId).coroutineContext.job.invokeOnCompletion(onCancelling = true, handler = handler)
4961
}
5062

63+
@InternalRPCApi
5164
public fun cancelRequestScopeById(callId: String, message: String, cause: Throwable?): Job? {
5265
return requests.remove(callId)?.apply { cancel(message, cause) }?.coroutineContext?.job
5366
}
5467

5568
// Group stream launches by callId. In case one fails, so do others
69+
@InternalRPCApi
5670
public fun launch(callId: String, block: suspend CoroutineScope.() -> Unit): Job {
5771
return getRequestScope(callId).launch(block = block)
5872
}
@@ -86,19 +100,19 @@ public fun CoroutineContext.withServerStreamScope(): CoroutineContext = withStre
86100

87101
@OptIn(InternalCoroutinesApi::class)
88102
internal fun CoroutineContext.withStreamScope(role: StreamScope.Role): CoroutineContext {
89-
return this + StreamScope(this, role).apply {
90-
this@withStreamScope.job.invokeOnCompletion(onCancelling = true) { close() }
103+
return this + StreamScope(this, role).contextElement.apply {
104+
this@withStreamScope.job.invokeOnCompletion(onCancelling = true) { scope.close() }
91105
}
92106
}
93107

94108
@InternalRPCApi
95109
public suspend fun streamScopeOrNull(): StreamScope? {
96-
return currentCoroutineContext()[StreamScope.Key]
110+
return currentCoroutineContext()[StreamScope.Element.Key]?.scope
97111
}
98112

99113
@InternalRPCApi
100114
public fun streamScopeOrNull(scope: CoroutineScope): StreamScope? {
101-
return scope.coroutineContext[StreamScope.Key]
115+
return scope.coroutineContext[StreamScope.Element.Key]?.scope
102116
}
103117

104118
internal fun noStreamScopeError(): Nothing {
@@ -165,22 +179,53 @@ public suspend fun <T> streamScoped(block: suspend CoroutineScope.() -> T): T {
165179
}
166180

167181
val context = currentCoroutineContext()
182+
.apply {
183+
checkContextForStreamScope()
184+
}
185+
186+
val streamScope = StreamScope(context, StreamScope.Role.Client)
187+
188+
return withContext(streamScope.contextElement) {
189+
streamScope.use {
190+
block()
191+
}
192+
}
193+
}
168194

169-
if (context[StreamScope.Key] != null) {
195+
private fun CoroutineContext.checkContextForStreamScope() {
196+
if (this[StreamScope.Element] != null) {
170197
error(
171198
"One of the following caused a failure: \n" +
172-
"- nested 'streamScoped' calls are not allowed.\n" +
173-
"- 'streamScoped' calls are not allowed in server RPC services."
199+
"- nested 'streamScoped' or `withStreamScope` calls are not allowed.\n" +
200+
"- 'streamScoped' or `withStreamScope` calls are not allowed in server RPC services."
174201
)
175202
}
203+
}
176204

177-
val streamScope = StreamScope(context, StreamScope.Role.Client)
205+
/**
206+
* Creates a [StreamScope] entity for manual stream management.
207+
*/
208+
@JsName("StreamScope_fun")
209+
@ExperimentalRPCApi
210+
public fun StreamScope(parent: CoroutineContext): StreamScope {
211+
parent.checkContextForStreamScope()
178212

179-
return withContext(streamScope) {
180-
streamScope.use {
181-
block()
182-
}
213+
return StreamScope(parent, StreamScope.Role.Client)
214+
}
215+
216+
/**
217+
* Adds manually managed [StreamScope] to the current context.
218+
*/
219+
@OptIn(ExperimentalContracts::class)
220+
@ExperimentalRPCApi
221+
public suspend fun <T> withStreamScope(scope: StreamScope, block: suspend CoroutineScope.() -> T): T {
222+
contract {
223+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
183224
}
225+
226+
currentCoroutineContext().checkContextForStreamScope()
227+
228+
return withContext(scope.contextElement, block)
184229
}
185230

186231
/**

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.MutableStateFlow
99
import kotlinx.coroutines.flow.first
1010
import kotlinx.coroutines.flow.toList
11+
import kotlinx.rpc.krpc.StreamScope
1112
import kotlinx.rpc.krpc.internal.STREAM_SCOPES_ENABLED
1213
import kotlinx.rpc.krpc.invokeOnStreamScopeCompletion
1314
import kotlinx.rpc.krpc.streamScoped
15+
import kotlinx.rpc.krpc.withStreamScope
1416
import kotlinx.rpc.withService
1517
import kotlin.test.*
1618

@@ -586,6 +588,55 @@ class CancellationTest {
586588
stopAllAndJoin()
587589
}
588590

591+
@Test
592+
fun manualStreamScopeNoCancel() = runCancellationTest {
593+
val myJob = Job()
594+
val streamScope = StreamScope(myJob)
595+
596+
val unrelatedJob = Job()
597+
598+
var first: Int = -1
599+
val deferredFlow = CoroutineScope(unrelatedJob).async {
600+
withStreamScope(streamScope) {
601+
service.incomingStream().apply { first = first() }
602+
}
603+
}
604+
val flow= deferredFlow.await()
605+
606+
serverInstance().fence.complete(Unit)
607+
val consumed = flow.toList()
608+
609+
assertEquals(0, first)
610+
assertContentEquals(listOf(1), consumed)
611+
612+
stopAllAndJoin()
613+
}
614+
615+
@Test
616+
fun manualStreamScopeWithCancel() = runCancellationTest {
617+
val myJob = Job()
618+
val streamScope = StreamScope(myJob)
619+
620+
val unrelatedJob = Job()
621+
622+
var first: Int = -1
623+
val deferredFlow = CoroutineScope(unrelatedJob).async {
624+
withStreamScope(streamScope) {
625+
service.incomingStream().apply { first = first() }
626+
}
627+
}
628+
val flow= deferredFlow.await()
629+
630+
streamScope.close()
631+
serverInstance().fence.complete(Unit)
632+
val consumed = flow.toList()
633+
634+
assertEquals(0, first)
635+
assertContentEquals(emptyList(), consumed)
636+
637+
stopAllAndJoin()
638+
}
639+
589640
@Test
590641
fun testCancelledClientCancelsRequest() = runCancellationTest {
591642
launch {

0 commit comments

Comments
 (0)