Skip to content

Commit 8552f0e

Browse files
committed
Fixed delay in arbitrary contexts
Fixes #133
1 parent 8839673 commit 8552f0e

File tree

2 files changed

+83
-4
lines changed

2 files changed

+83
-4
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,13 @@ internal class CancellableContinuationImpl<in T>(
249249
}
250250

251251
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
252-
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
253-
resumeImpl(value, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
252+
val dc = delegate as? DispatchedContinuation
253+
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
254254
}
255255

256256
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
257-
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
258-
resumeWithExceptionImpl(exception, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
257+
val dc = delegate as? DispatchedContinuation
258+
resumeWithExceptionImpl(exception, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
259259
}
260260

261261
override fun nameString(): String =
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental
18+
19+
import org.hamcrest.MatcherAssert.assertThat
20+
import org.hamcrest.core.IsEqual
21+
import org.junit.Test
22+
import java.util.concurrent.Executor
23+
import java.util.concurrent.Executors
24+
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
25+
import kotlin.coroutines.experimental.Continuation
26+
import kotlin.coroutines.experimental.ContinuationInterceptor
27+
import kotlin.coroutines.experimental.CoroutineContext
28+
29+
class DelayTest : TestBase() {
30+
/**
31+
* Test that delay works properly in contexts with custom [ContinuationInterceptor]
32+
*/
33+
@Test
34+
fun testDelayInArbitraryContext() = runBlocking {
35+
var thread: Thread? = null
36+
val pool = Executors.newFixedThreadPool(1) { runnable ->
37+
Thread(runnable).also { thread = it }
38+
}
39+
val context = CustomInterceptor(pool)
40+
val c = async(context) {
41+
assertThat(Thread.currentThread(), IsEqual(thread))
42+
delay(100)
43+
assertThat(Thread.currentThread(), IsEqual(thread))
44+
42
45+
}
46+
assertThat(c.await(), IsEqual(42))
47+
pool.shutdown()
48+
}
49+
50+
51+
@Test
52+
fun testDelayWithoutDispatcher() = runBlocking(CoroutineName("testNoDispatcher.main")) {
53+
// launch w/o a specified dispatcher
54+
val c = async(CoroutineName("testNoDispatcher.inner")) {
55+
delay(100)
56+
42
57+
}
58+
assertThat(c.await(), IsEqual(42))
59+
}
60+
61+
class CustomInterceptor(val pool: Executor) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
62+
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
63+
Wrapper(pool, continuation)
64+
}
65+
66+
class Wrapper<T>(val pool: Executor, val cont: Continuation<T>) : Continuation<T> {
67+
override val context: CoroutineContext
68+
get() = cont.context
69+
70+
override fun resume(value: T) {
71+
pool.execute { cont.resume(value) }
72+
}
73+
74+
override fun resumeWithException(exception: Throwable) {
75+
pool.execute { cont.resumeWithException(exception) }
76+
}
77+
}
78+
79+
}

0 commit comments

Comments
 (0)