Skip to content

Commit 78832e3

Browse files
committed
Use final Job state in undispatched coroutine completion
* When using in-place coroutine builders, startUndispatchedOrReturn will return block's final value on fast-path. This is not correct because if any of the children had failed, whether or not exception is thrown depend only on child timings. Moreover, in the face of such race exception may be ignored if scope owner wasn't cancelled or is missing. * Introduce special hack for withTimeout and withTimeoutOrNull to ignore TimeoutCancellationException on the fast path * withContext, coroutineScope, supervisorScope, withTimeout and withTimeoutOrNull are affected and now check cancellation on fast-path
1 parent bafe7fb commit 78832e3

File tree

6 files changed

+163
-29
lines changed

6 files changed

+163
-29
lines changed

common/kotlinx-coroutines-core-common/src/Timeout.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private fun <U, T: U> setupTimeout(
7474
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
7575
// restart block using new coroutine with new job,
7676
// however start it as undispatched coroutine, because we are already in the proper context
77-
return coroutine.startUndispatchedOrReturn(coroutine, block)
77+
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block, coroutine)
7878
}
7979

8080
private open class TimeoutCoroutine<U, in T: U>(

common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,69 @@ private inline fun <T> startDirect(completion: Continuation<T>, block: () -> Any
7070
}
7171

7272
/**
73-
* Starts this coroutine with the given code [block] in the same context and returns result when it
73+
* Starts this coroutine with the given code [block] in the same context and returns coroutine result when it
7474
* completes without suspension.
7575
* This function shall be invoked at most once on this coroutine.
76+
* This function checks cancellation of the outer [Job] on fast-path.
7677
*
7778
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
7879
* during construction. Second, it starts the coroutine using [startCoroutineUninterceptedOrReturn].
7980
*/
8081
internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
8182
initParentJob()
82-
return undispatchedResult { block.startCoroutineUninterceptedOrReturn(receiver, this) }
83+
return undispatchedResult({ true }) { block.startCoroutineUninterceptedOrReturn(receiver, this) }
8384
}
8485

85-
private inline fun <T> AbstractCoroutine<T>.undispatchedResult(startBlock: () -> Any?): Any? {
86+
/**
87+
* Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path.
88+
*/
89+
internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout(
90+
receiver: R, block: suspend R.() -> T,
91+
timeoutCoroutine: Job
92+
): Any? {
93+
initParentJob()
94+
return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === timeoutCoroutine) })
95+
{ block.startCoroutineUninterceptedOrReturn(receiver, this) }
96+
}
97+
98+
private inline fun <T> AbstractCoroutine<T>.undispatchedResult(
99+
shouldThrow: (Throwable) -> Boolean,
100+
startBlock: () -> Any?
101+
): Any? {
86102
val result = try {
87103
startBlock()
88104
} catch (e: Throwable) {
89105
CompletedExceptionally(e)
90106
}
107+
108+
/*
109+
* We're trying to complete our undispatched block here and have three code-paths:
110+
* 1) Suspended.
111+
*
112+
* Or we are completing our block (and its job).
113+
* 2) If we can't complete it, we suspend, probably waiting for children (2)
114+
* 3) If we have successfully completed the whole coroutine here in an undispatched manner,
115+
* we should decide which result to return. We have two options: either return proposed update or actual final state.
116+
* But if fact returning proposed value is not an option, otherwise we will ignore possible cancellation or child failure.
117+
*
118+
* shouldThrow parameter is a special code path for timeout coroutine:
119+
* If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value,
120+
* not a timeout exception.
121+
*/
91122
return when {
92123
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
93124
makeCompletingOnce(result, MODE_IGNORE) -> {
94-
if (result is CompletedExceptionally) throw result.cause else result
125+
val state = state
126+
if (state is CompletedExceptionally) {
127+
if (shouldThrow(state.cause)) {
128+
throw state.cause
129+
} else {
130+
result
131+
}
132+
} else {
133+
state
134+
}
95135
}
96136
else -> COROUTINE_SUSPENDED
97137
}
98-
}
138+
}

common/kotlinx-coroutines-core-common/test/TestBase.common.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.coroutines
66

7+
import kotlin.coroutines.*
8+
79
public expect open class TestBase constructor() {
810
public val isStressTest: Boolean
911
public val stressTestMultiplier: Int
@@ -26,3 +28,13 @@ public class TestException1(message: String? = null) : Throwable(message)
2628
public class TestException2(message: String? = null) : Throwable(message)
2729
public class TestException3(message: String? = null) : Throwable(message)
2830
public class TestRuntimeException(message: String? = null) : RuntimeException(message)
31+
32+
// Wrap context to avoid fast-paths on dispatcher comparison
33+
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
34+
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
35+
return object : CoroutineDispatcher() {
36+
override fun dispatch(context: CoroutineContext, block: Runnable) {
37+
dispatcher.dispatch(context, block)
38+
}
39+
}
40+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.test.*
9+
10+
class UndispatchedResultTest : TestBase() {
11+
12+
@Test
13+
fun testWithContext() = runTest {
14+
invokeTest { block -> withContext(wrapperDispatcher(coroutineContext), block) }
15+
}
16+
17+
@Test
18+
fun testWithContextFastPath() = runTest {
19+
invokeTest { block -> withContext(coroutineContext, block) }
20+
}
21+
22+
@Test
23+
fun testWithTimeout() = runTest {
24+
invokeTest { block -> withTimeout(Long.MAX_VALUE, block) }
25+
}
26+
27+
@Test
28+
fun testAsync() = runTest {
29+
invokeTest { block -> async(NonCancellable, block = block).await() }
30+
}
31+
32+
@Test
33+
fun testCoroutineScope() = runTest {
34+
invokeTest { block -> coroutineScope(block) }
35+
}
36+
37+
private suspend fun invokeTest(scopeProvider: suspend (suspend CoroutineScope.() -> Unit) -> Unit) {
38+
invokeTest(EmptyCoroutineContext, scopeProvider)
39+
invokeTest(Unconfined, scopeProvider)
40+
}
41+
42+
private suspend fun invokeTest(
43+
context: CoroutineContext,
44+
scopeProvider: suspend (suspend CoroutineScope.() -> Unit) -> Unit
45+
) {
46+
try {
47+
scopeProvider { block(context) }
48+
} catch (e: TestException) {
49+
finish(5)
50+
reset()
51+
}
52+
}
53+
54+
private suspend fun CoroutineScope.block(context: CoroutineContext) {
55+
try {
56+
expect(1)
57+
// Will cancel its parent
58+
async(context) {
59+
expect(2)
60+
throw TestException()
61+
}.await()
62+
} catch (e: TestException) {
63+
expect(3)
64+
}
65+
expect(4)
66+
}
67+
}

common/kotlinx-coroutines-core-common/test/WithContextTest.kt

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package kotlinx.coroutines
99

10-
import kotlin.coroutines.*
1110
import kotlin.test.*
1211

1312
class WithContextTest : TestBase() {
@@ -85,20 +84,25 @@ class WithContextTest : TestBase() {
8584
}
8685
expect(2)
8786
val job = Job()
88-
val result = withContext(coroutineContext + job) { // same context + new job
89-
expect(3) // still here
90-
job.cancel() // cancel out job!
91-
try {
92-
yield() // shall throw CancellationException
93-
expectUnreached()
94-
} catch (e: CancellationException) {
95-
expect(4)
87+
try {
88+
withContext(coroutineContext + job) {
89+
// same context + new job
90+
expect(3) // still here
91+
job.cancel() // cancel out job!
92+
try {
93+
yield() // shall throw CancellationException
94+
expectUnreached()
95+
} catch (e: CancellationException) {
96+
expect(4)
97+
}
98+
"OK"
9699
}
97-
"OK"
100+
101+
expectUnreached()
102+
} catch (e: CancellationException) {
103+
expect(5)
104+
// will wait for the first coroutine
98105
}
99-
assertEquals("OK", result)
100-
expect(5)
101-
// will wait for the first coroutine
102106
}
103107

104108
@Test
@@ -300,13 +304,4 @@ class WithContextTest : TestBase() {
300304
}
301305
finish(5)
302306
}
303-
304-
private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
305-
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
306-
return object : CoroutineDispatcher() {
307-
override fun dispatch(context: CoroutineContext, block: Runnable) {
308-
dispatcher.dispatch(context, block)
309-
}
310-
}
311-
}
312307
}

core/kotlinx-coroutines-core/test/WithTimeoutOrNullJvmTest.kt

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,24 @@ class WithTimeoutOrNullJvmTest : TestBase() {
2121
// outer timeout results in null
2222
assertEquals(null, result)
2323
}
24-
}
24+
25+
@Test
26+
fun testIgnoredTimeout() = runTest {
27+
val value =withTimeout(1) {
28+
Thread.sleep(10)
29+
42
30+
}
31+
32+
assertEquals(42, value)
33+
}
34+
35+
@Test
36+
fun testIgnoredTimeoutOnNull() = runTest {
37+
val value = withTimeoutOrNull(1) {
38+
Thread.sleep(10)
39+
42
40+
}
41+
42+
assertEquals(42, value)
43+
}
44+
}

0 commit comments

Comments
 (0)