Skip to content

Commit ebc8866

Browse files
committed
AbstractCoroutines notifies onXXX before all the installed handlers;
so launch handles uncaught exceptions before "joining" coroutines wakeup. Fixes #208
1 parent dfeba24 commit ebc8866

File tree

10 files changed

+115
-28
lines changed

10 files changed

+115
-28
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public abstract class AbstractCoroutine<in T>(
8080
*/
8181
protected open fun onCancellation(cause: Throwable?) {}
8282

83-
internal final override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
83+
internal override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
8484
onCancellation(exceptionally?.cause)
8585
}
8686

@@ -94,13 +94,8 @@ public abstract class AbstractCoroutine<in T>(
9494
*/
9595
protected open fun onCompletedExceptionally(exception: Throwable) {}
9696

97-
/**
98-
* Override for post-completion actions that need to do something with the state.
99-
* @param mode completion mode.
100-
* @suppress **This is unstable API and it is subject to change.**
101-
*/
10297
@Suppress("UNCHECKED_CAST")
103-
internal override fun afterCompletion(state: Any?, mode: Int) {
98+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
10499
if (state is CompletedExceptionally)
105100
onCompletedExceptionally(state.exception)
106101
else

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ internal expect open class JobSupport(active: Boolean) : Job {
8787

8888
internal fun initParentJobInternal(parent: Job?)
8989
internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean
90-
internal open fun afterCompletion(state: Any?, mode: Int)
90+
internal open fun onCompletionInternal(state: Any?, mode: Int)
9191
internal open fun onStartInternal()
9292
internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?)
9393
internal open fun nameString(): String
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import kotlin.test.*
20+
21+
class AbstractCoroutineTest : TestBase() {
22+
@Test
23+
fun testNotifications() = runTest {
24+
expect(1)
25+
val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
26+
override fun onStart() {
27+
expect(3)
28+
}
29+
30+
override fun onCancellation(cause: Throwable?) {
31+
assertTrue(cause == null)
32+
expect(5)
33+
}
34+
35+
override fun onCompleted(value: String) {
36+
assertEquals("OK", value)
37+
expect(6)
38+
}
39+
40+
override fun onCompletedExceptionally(exception: Throwable) {
41+
expectUnreached()
42+
}
43+
}
44+
coroutine.invokeOnCompletion {
45+
assertTrue(it == null)
46+
expect(7)
47+
}
48+
expect(2)
49+
coroutine.start()
50+
expect(4)
51+
coroutine.resume("OK")
52+
finish(8)
53+
}
54+
55+
@Test
56+
fun testNotificationsWithException() = runTest {
57+
expect(1)
58+
val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
59+
override fun onStart() {
60+
expect(3)
61+
}
62+
63+
override fun onCancellation(cause: Throwable?) {
64+
assertTrue(cause is TestException0)
65+
expect(5)
66+
}
67+
68+
override fun onCompleted(value: String) {
69+
expectUnreached()
70+
}
71+
72+
override fun onCompletedExceptionally(exception: Throwable) {
73+
assertTrue(exception is TestException1)
74+
expect(7)
75+
}
76+
}
77+
coroutine.invokeOnCompletion {
78+
assertTrue(it is TestException1)
79+
expect(8)
80+
}
81+
expect(2)
82+
coroutine.start()
83+
expect(4)
84+
coroutine.cancel(TestException0())
85+
expect(6)
86+
coroutine.resumeWithException(TestException1())
87+
finish(9)
88+
}
89+
90+
private class TestException0 : Throwable()
91+
private class TestException1 : Throwable()
92+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ internal abstract class AbstractContinuation<in T>(
7474
@PublishedApi
7575
internal fun getResult(): Any? {
7676
if (trySuspend()) return COROUTINE_SUSPENDED
77-
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
77+
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
7878
val state = this.state
7979
if (state is CompletedExceptionally) throw state.exception
8080
return getSuccessfulResult(state)
8181
}
8282

83-
override fun afterCompletion(state: Any?, mode: Int) {
83+
internal final override fun onCompletionInternal(state: Any?, mode: Int) {
8484
if (tryResume()) return // completed before getResult invocation -- bail out
8585
// otherwise, getResult has already commenced, i.e. completed later or in other thread
8686
dispatch(mode)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ private class BlockingCoroutine<T>(
218218
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
219219
}
220220

221-
override fun afterCompletion(state: Any?, mode: Int) {
221+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
222222
// wake up blocked thread
223223
if (Thread.currentThread() != blockedThread)
224224
LockSupport.unpark(blockedThread)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -695,8 +695,11 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job,
695695
* @suppress **This is unstable API and it is subject to change.**
696696
*/
697697
internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
698-
// Invoke completion handlers
699698
val exceptionally = update as? CompletedExceptionally
699+
// Do overridable processing before completion handlers
700+
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
701+
onCompletionInternal(update, mode)
702+
// Invoke completion handlers
700703
val cause = exceptionally?.cause
701704
if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
702705
try {
@@ -707,9 +710,6 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job,
707710
} else {
708711
expect.list?.notifyCompletion(cause)
709712
}
710-
// Do overridable processing after completion handlers
711-
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
712-
afterCompletion(update, mode)
713713
}
714714

715715
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
@@ -1162,7 +1162,7 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job,
11621162
* @param mode completion mode.
11631163
* @suppress **This is unstable API and it is subject to change.**
11641164
*/
1165-
internal actual open fun afterCompletion(state: Any?, mode: Int) {}
1165+
internal actual open fun onCompletionInternal(state: Any?, mode: Int) {}
11661166

11671167
// for nicer debugging
11681168
public override fun toString(): String =

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private open class TimeoutCoroutine<U, in T: U>(
101101
}
102102

103103
@Suppress("UNCHECKED_CAST")
104-
override fun afterCompletion(state: Any?, mode: Int) {
104+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
105105
if (state is CompletedExceptionally)
106106
cont.resumeWithExceptionMode(state.exception, mode)
107107
else
@@ -171,7 +171,7 @@ private class TimeoutOrNullCoroutine<T>(
171171
cont: Continuation<T?>
172172
) : TimeoutCoroutine<T?, T>(time, unit, cont) {
173173
@Suppress("UNCHECKED_CAST")
174-
override fun afterCompletion(state: Any?, mode: Int) {
174+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
175175
if (state is CompletedExceptionally) {
176176
val exception = state.exception
177177
if (exception is TimeoutCancellationException && exception.coroutine === this)

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ internal abstract class AbstractContinuation<in T>(
6464
@PublishedApi
6565
internal fun getResult(): Any? {
6666
if (trySuspend()) return COROUTINE_SUSPENDED
67-
// otherwise, afterCompletion was already invoked & invoked tryResume, and the result is in the state
67+
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
6868
val state = this.state
6969
if (state is CompletedExceptionally) throw state.exception
7070
return getSuccessfulResult(state)
7171
}
7272

73-
override fun afterCompletion(state: Any?, mode: Int) {
73+
internal final override fun onCompletionInternal(state: Any?, mode: Int) {
7474
if (tryResume()) return // completed before getResult invocation -- bail out
7575
// otherwise, getResult has already commenced, i.e. completed later or in other thread
7676
dispatch(mode)

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,11 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job
461461
* @suppress **This is unstable API and it is subject to change.**
462462
*/
463463
internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
464-
// Invoke completion handlers
465464
val exceptionally = update as? CompletedExceptionally
465+
// Do overridable processing before completion handlers
466+
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
467+
onCompletionInternal(update, mode)
468+
// Invoke completion handlers
466469
val cause = exceptionally?.cause
467470
if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
468471
try {
@@ -473,9 +476,6 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job
473476
} else {
474477
expect.list?.notifyCompletion(cause)
475478
}
476-
// Do overridable processing after completion handlers
477-
if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
478-
afterCompletion(update, mode)
479479
}
480480

481481
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
@@ -861,11 +861,11 @@ internal actual open class JobSupport actual constructor(active: Boolean) : Job
861861
internal actual open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {}
862862

863863
/**
864-
* Override for post-completion actions that need to do something with the state.
864+
* Override for completion actions that need to do something with the state.
865865
* @param mode completion mode.
866866
* @suppress **This is unstable API and it is subject to change.**
867867
*/
868-
internal actual open fun afterCompletion(state: Any?, mode: Int) {}
868+
internal actual open fun onCompletionInternal(state: Any?, mode: Int) {}
869869

870870
// for nicer debugging
871871
public override fun toString(): String =

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private open class TimeoutCoroutine<U, in T: U>(
7171
}
7272

7373
@Suppress("UNCHECKED_CAST")
74-
override fun afterCompletion(state: Any?, mode: Int) {
74+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
7575
if (state is CompletedExceptionally)
7676
cont.resumeWithExceptionMode(state.exception, mode)
7777
else
@@ -111,7 +111,7 @@ private class TimeoutOrNullCoroutine<T>(
111111
cont: Continuation<T?>
112112
) : TimeoutCoroutine<T?, T>(time, cont) {
113113
@Suppress("UNCHECKED_CAST")
114-
override fun afterCompletion(state: Any?, mode: Int) {
114+
internal override fun onCompletionInternal(state: Any?, mode: Int) {
115115
if (state is CompletedExceptionally) {
116116
val exception = state.exception
117117
if (exception is TimeoutCancellationException && exception.coroutine === this)

0 commit comments

Comments
 (0)