Skip to content

Commit 7cf452e

Browse files
committed
Dispatchers default behaviour is changed to schedule new coroutine for execution later.
Job.getInactiveCancellationException is introduced. yield is cancellable while it is waiting to executed. core tutorial is expanded.
1 parent 55888f2 commit 7cf452e

File tree

23 files changed

+347
-240
lines changed

23 files changed

+347
-240
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kotlinx.coroutines.experimental
22

3+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
34
import kotlin.coroutines.Continuation
5+
import kotlin.coroutines.ContinuationInterceptor
46
import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
57
import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
68
import kotlin.coroutines.suspendCoroutine
@@ -37,25 +39,39 @@ public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (Ca
3739
internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
3840
val job = cont.context[Job]
3941
// fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
40-
job?.isActive?.let { if (!it) throw CancellationException() }
42+
if (job != null && !job.isActive) throw job.getInactiveCancellationException()
4143
return job
4244
}
4345

4446
@PublishedApi
4547
internal class SafeCancellableContinuation<in T>(
4648
private val delegate: Continuation<T>,
47-
parentJob: Job?
49+
private val parentJob: Job?
4850
) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
4951
// only updated from the thread that invoked suspendCancellableCoroutine
50-
private var suspendedThread: Thread? = Thread.currentThread()
52+
53+
@Volatile
54+
private var decision = UNDECIDED
55+
56+
private companion object {
57+
val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
58+
AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
59+
60+
const val UNDECIDED = 0
61+
const val SUSPENDED = 1
62+
const val RESUMED = 2
63+
const val YIELD = 3 // used by cancellable "yield"
64+
}
5165

5266
init { initParentJob(parentJob) }
5367

5468
fun getResult(): Any? {
55-
if (suspendedThread != null) {
56-
suspendedThread = null
57-
return SUSPENDED_MARKER
69+
val decision = this.decision // volatile read
70+
when (decision) {
71+
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return SUSPENDED_MARKER
72+
YIELD -> return SUSPENDED_MARKER
5873
}
74+
// otherwise, afterCompletion was already invoked, and the result is in the state
5975
val state = getState()
6076
if (state is CompletedExceptionally) throw state.exception
6177
return state
@@ -66,15 +82,21 @@ internal class SafeCancellableContinuation<in T>(
6682

6783
@Suppress("UNCHECKED_CAST")
6884
override fun afterCompletion(state: Any?) {
69-
if (suspendedThread === Thread.currentThread()) {
70-
// cancelled during suspendCancellableCoroutine in its thread
71-
suspendedThread = null
72-
} else {
73-
// cancelled later or in other thread
74-
if (state is CompletedExceptionally)
75-
delegate.resumeWithException(state.exception)
76-
else
77-
delegate.resume(state as T)
78-
}
85+
val decision = this.decision // volatile read
86+
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
87+
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
88+
if (state is CompletedExceptionally)
89+
delegate.resumeWithException(state.exception)
90+
else if (decision == YIELD && delegate is DispatchedContinuation)
91+
delegate.resumeYield(parentJob, state as T)
92+
else
93+
delegate.resume(state as T)
94+
}
95+
96+
// can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
97+
fun resumeYield(value: T) {
98+
if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
99+
DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
100+
resume(value)
79101
}
80102
}
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package kotlinx.coroutines.experimental
22

3-
import java.util.concurrent.Executor
3+
import java.util.concurrent.ExecutorService
44
import java.util.concurrent.Executors
55
import java.util.concurrent.atomic.AtomicInteger
66
import kotlin.coroutines.CoroutineContext
@@ -12,21 +12,21 @@ import kotlin.coroutines.CoroutineContext
1212
* When available, it wraps `ForkJoinPool.commonPool` and provides a similar shared pool where not.
1313
*/
1414
object CommonPool : CoroutineDispatcher() {
15-
private val pool: Executor = findPool()
15+
private val pool: ExecutorService = findPool()
1616

1717
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
1818

19-
private fun findPool(): Executor {
19+
private fun findPool(): ExecutorService {
2020
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
2121
?: return createPlainPool()
22-
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? Executor }
22+
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
2323
?. let { return it }
24-
Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? Executor }
24+
Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
2525
?. let { return it }
2626
return createPlainPool()
2727
}
2828

29-
private fun createPlainPool(): Executor {
29+
private fun createPlainPool(): ExecutorService {
3030
val threadId = AtomicInteger()
3131
return Executors.newFixedThreadPool(defaultParallelism()) {
3232
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
@@ -35,6 +35,5 @@ object CommonPool : CoroutineDispatcher() {
3535

3636
private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
3737

38-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
3938
override fun dispatch(context: CoroutineContext, block: Runnable) = pool.execute(block)
4039
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ public abstract class CoroutineDispatcher :
2525
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
2626
/**
2727
* Return `true` if execution shall be dispatched onto another thread.
28+
* The default behaviour for most dispatchers is to return `true`.
2829
*/
29-
public abstract fun isDispatchNeeded(context: CoroutineContext): Boolean
30+
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
3031

3132
/**
3233
* Dispatches execution of a runnable [block] onto another thread in the given [context].
@@ -37,7 +38,7 @@ public abstract class CoroutineDispatcher :
3738
DispatchedContinuation<T>(this, continuation)
3839
}
3940

40-
private class DispatchedContinuation<T>(
41+
internal class DispatchedContinuation<T>(
4142
val dispatcher: CoroutineDispatcher,
4243
val continuation: Continuation<T>
4344
): Continuation<T> by continuation {
@@ -68,4 +69,22 @@ private class DispatchedContinuation<T>(
6869
continuation.resumeWithException(exception)
6970
}
7071
}
72+
73+
// used by "yield" implementation
74+
fun resumeYield(job: Job?, value: T) {
75+
val context = continuation.context
76+
if (dispatcher.isDispatchNeeded(context))
77+
dispatcher.dispatch(context, Runnable {
78+
withCoroutineContext(context) {
79+
if (job?.isActive == false)
80+
continuation.resumeWithException(job.getInactiveCancellationException())
81+
else
82+
continuation.resume(value)
83+
}
84+
})
85+
else
86+
withCoroutineContext(context) {
87+
continuation.resume(value)
88+
}
89+
}
7190
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kotlinx.coroutines.experimental
33
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
44
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
55
import java.util.concurrent.locks.LockSupport
6-
import kotlin.coroutines.Continuation
76
import kotlin.coroutines.CoroutineContext
87

98
/**
@@ -41,7 +40,7 @@ public interface EventLoop {
4140

4241
internal class EventLoopImpl(
4342
val thread: Thread
44-
) : CoroutineDispatcher(), EventLoop, Yield {
43+
) : CoroutineDispatcher(), EventLoop {
4544
val queue = LockFreeLinkedListHead()
4645
var parentJob: Job? = null
4746

@@ -50,26 +49,18 @@ internal class EventLoopImpl(
5049
this.parentJob = coroutine
5150
}
5251

53-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = Thread.currentThread() != thread
54-
5552
override fun dispatch(context: CoroutineContext, block: Runnable) {
5653
schedule(Dispatch(block))
5754
}
5855

59-
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
60-
val node = Resume(continuation)
61-
if (schedule(node))
62-
continuation.removeOnCompletion(node)
63-
}
64-
6556
fun schedule(node: Node): Boolean {
6657
val added = if (parentJob == null) {
6758
queue.addLast(node)
6859
true
6960
} else
7061
queue.addLastIf(node) { parentJob!!.isActive }
7162
if (added) {
72-
if (Thread.currentThread() != thread)
63+
if (Thread.currentThread() !== thread)
7364
LockSupport.unpark(thread)
7465
} else {
7566
node.run()
@@ -78,7 +69,7 @@ internal class EventLoopImpl(
7869
}
7970

8071
override fun processNextEvent(): Boolean {
81-
if (Thread.currentThread() != thread) return false
72+
if (Thread.currentThread() !== thread) return false
8273
(queue.removeFirstOrNull() as? Runnable)?.apply {
8374
run()
8475
return true
@@ -89,9 +80,5 @@ internal class EventLoopImpl(
8980
abstract class Node : LockFreeLinkedListNode(), Runnable
9081

9182
class Dispatch(block: Runnable) : Node(), Runnable by block
92-
93-
class Resume(val cont: Continuation<Unit>) : Node() {
94-
override fun run() = cont.resume(Unit)
95-
}
9683
}
9784

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
1313
ExecutorCoroutineDispatcher(this)
1414

1515
internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
16-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
1716
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
1817

1918
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ public interface Job : CoroutineContext.Element {
3939
*/
4040
public val isActive: Boolean
4141

42+
/**
43+
* Returns [CancellationException] that [cancellable][suspendCancellableCoroutine] suspending functions throw when
44+
* trying to suspend in the context of this job. This function throws [IllegalAccessException] when invoked
45+
* for an [active][isActive] job.
46+
*/
47+
fun getInactiveCancellationException(): CancellationException
48+
4249
/**
4350
* Registers completion handler. The action depends on the state of this job.
4451
* When job is cancelled with [cancel], then the handler is immediately invoked
@@ -230,6 +237,15 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
230237

231238
final override val isActive: Boolean get() = state is Active
232239

240+
override fun getInactiveCancellationException(): CancellationException {
241+
val state = getState()
242+
return when (state) {
243+
is Active -> throw IllegalStateException("Job is still active")
244+
is CompletedExceptionally -> state.cancellationException
245+
else -> CancellationException("Job has completed with result")
246+
}
247+
}
248+
233249
final override fun onCompletion(handler: CompletionHandler): Job.Registration {
234250
var nodeCache: JobNode? = null
235251
while (true) { // lock-free loop on state
@@ -332,6 +348,17 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
332348
internal abstract class CompletedExceptionally {
333349
abstract val cancelReason: Throwable // original reason or fresh CancellationException
334350
abstract val exception: Throwable // the exception to be thrown in continuation
351+
352+
// convert cancelReason to CancellationException on first need
353+
@Volatile
354+
private var _cancellationException: CancellationException? = null
355+
356+
val cancellationException: CancellationException get() =
357+
_cancellationException ?: // atomic read volatile var or else build new
358+
(cancelReason as? CancellationException ?:
359+
CancellationException(cancelReason.message).apply { initCause(cancelReason) })
360+
.also { _cancellationException = it }
361+
335362
}
336363

337364
/**
@@ -343,24 +370,16 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
343370

344371
override val cancelReason: Throwable get() =
345372
_cancelReason ?: // atomic read volatile var or else create new
346-
CancellationException().also { _cancelReason = it }
373+
CancellationException("Job was cancelled without specified reason").also { _cancelReason = it }
347374

348-
@Volatile
349-
private var _exception: Throwable? = null // convert reason to CancellationException on first need
350-
351-
override val exception: Throwable get() =
352-
_exception ?: // atomic read volatile var or else build new
353-
(cancelReason as? CancellationException ?:
354-
CancellationException(cancelReason.message).apply { initCause(cancelReason) })
355-
.also { _exception = it }
375+
override val exception: Throwable get() = cancellationException
356376
}
357377

358378
/**
359379
* Represents a [state][getState] of a failed job.
360380
*/
361381
internal class Failed(override val exception: Throwable) : CompletedExceptionally() {
362-
override val cancelReason: Throwable
363-
get() = exception
382+
override val cancelReason: Throwable get() = exception
364383
}
365384
}
366385

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
55
import java.util.concurrent.TimeUnit
66
import kotlin.coroutines.startCoroutine
77

8-
val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 50L)
8+
val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
99

1010
internal val scheduledExecutor by lazy<ScheduledExecutorService> {
1111
ScheduledThreadPoolExecutor(1) { r ->
12-
Thread(r, "kotlinx.coroutines.ScheduledExecutor")
12+
Thread(r, "kotlinx.coroutines.ScheduledExecutor").apply { isDaemon = true }
1313
}.apply {
1414
setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
1515
allowCoreThreadTimeOut(true)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import java.util.concurrent.Executors
44
import java.util.concurrent.ScheduledExecutorService
55
import java.util.concurrent.TimeUnit
66
import java.util.concurrent.atomic.AtomicInteger
7-
import kotlin.concurrent.thread
87
import kotlin.coroutines.CoroutineContext
98

109
/**
@@ -30,34 +29,22 @@ fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null):
3029
return job + ThreadPoolDispatcher(nThreads, name, job)
3130
}
3231

33-
private val thisThreadContext = ThreadLocal<ThreadPoolDispatcher>()
34-
3532
private class ThreadPoolDispatcher(
3633
nThreads: Int,
3734
name: String,
3835
val job: Job
39-
) : CoroutineDispatcher(), Yield, Delay {
36+
) : CoroutineDispatcher(), Delay {
4037
val threadNo = AtomicInteger()
4138
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
42-
thread(start = false, isDaemon = true,
43-
name = if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) {
44-
thisThreadContext.set(this@ThreadPoolDispatcher)
45-
target.run()
46-
}
39+
Thread(target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()).apply { isDaemon = true }
4740
}
4841

4942
init {
5043
job.onCompletion { executor.shutdown() }
5144
}
5245

53-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = thisThreadContext.get() != this
54-
5546
override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
5647

57-
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
58-
executor.scheduleResume(continuation)
59-
}
60-
6148
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
6249
executor.scheduleResumeAfterDelay(time, unit, continuation)
6350
}

0 commit comments

Comments
 (0)