Skip to content

Commit 09b9d6c

Browse files
committed
Use FIFO queue in undispatched event loop
1 parent fd54bc4 commit 09b9d6c

File tree

6 files changed

+58
-82
lines changed

6 files changed

+58
-82
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,51 +9,24 @@ import kotlin.coroutines.*
99
import kotlin.jvm.*
1010

1111
@Suppress("PrivatePropertyName")
12-
@JvmField
13-
internal val UNDEFINED = Symbol("UNDEFINED")
12+
private val UNDEFINED = Symbol("UNDEFINED")
1413

1514
@NativeThreadLocal
1615
internal object UndispatchedEventLoop {
1716
data class State(
1817
@JvmField var isActive: Boolean = false,
19-
@JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList()
18+
@JvmField val threadLocalQueue: ArrayQueue<Runnable> = ArrayQueue()
2019
)
2120

2221
@JvmField
2322
internal val state = CommonThreadLocal { State() }
2423

25-
fun dispatch(block: Runnable) {
26-
val state = state.get()
27-
if (state.isActive) {
28-
state.threadLocalQueue.add(block)
29-
return
30-
}
31-
32-
try {
33-
state.isActive = true
34-
block.run()
35-
while (!state.threadLocalQueue.isEmpty()) {
36-
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
37-
element.run()
38-
}
39-
} catch (e: Throwable) {
40-
/*
41-
* This exception doesn't happen normally, only if user either submitted throwing runnable
42-
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
43-
*/
44-
state.threadLocalQueue.clear()
45-
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
46-
} finally {
47-
state.isActive = false
48-
}
49-
}
50-
5124
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
5225
val state = state.get()
5326
if (state.isActive) {
5427
continuation._state = contState
5528
continuation.resumeMode = mode
56-
state.threadLocalQueue.add(continuation)
29+
state.threadLocalQueue.addLast(continuation)
5730
return
5831
}
5932

@@ -63,7 +36,7 @@ internal object UndispatchedEventLoop {
6336
inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
6437
val state = state.get()
6538
if (state.isActive) {
66-
state.threadLocalQueue.add(task)
39+
state.threadLocalQueue.addLast(task)
6740
return
6841
}
6942

@@ -74,8 +47,8 @@ internal object UndispatchedEventLoop {
7447
try {
7548
state.isActive = true
7649
block()
77-
while (!state.threadLocalQueue.isEmpty()) {
78-
val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
50+
while (!state.threadLocalQueue.isEmpty) {
51+
val element = state.threadLocalQueue.removeFirst()
7952
element.run()
8053
}
8154
} catch (e: Throwable) {

common/kotlinx-coroutines-core-common/src/JobSupport.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,7 +1072,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
10721072
*/
10731073
internal suspend fun awaitInternal(): Any? {
10741074
// fast-path -- check state (avoid extra object creation)
1075-
while(true) { // lock-free loop on state
1075+
while (true) { // lock-free loop on state
10761076
val state = this.state
10771077
if (state !is Incomplete) {
10781078
// already complete -- just return result
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
internal class ArrayQueue<T : Any> {
8+
public val isEmpty: Boolean get() = head == tail
9+
10+
private var elements = arrayOfNulls<Any>(16)
11+
private var head = 0
12+
private var tail = 0
13+
14+
public fun addLast(element: T) {
15+
elements[tail] = element
16+
tail = (tail + 1) and elements.size - 1
17+
if (tail == head) ensureCapacity()
18+
}
19+
20+
@Suppress("UNCHECKED_CAST")
21+
public fun removeFirst(): T {
22+
require(head != tail) { "Queue is empty" }
23+
val element = elements[head]
24+
elements[head] = null
25+
head = (head + 1) and elements.size - 1
26+
return element!! as T
27+
}
28+
29+
public fun clear() {
30+
head = 0
31+
tail = 0
32+
elements = arrayOfNulls(elements.size)
33+
}
34+
35+
private fun ensureCapacity() {
36+
val currentSize = elements.size
37+
val newCapacity = currentSize shl 1
38+
val newElements = arrayOfNulls<Any>(newCapacity)
39+
val remaining = elements.size - head
40+
arraycopy(elements, head, newElements, 0, remaining)
41+
arraycopy(elements, 0, newElements, remaining, head)
42+
elements = newElements
43+
head = 0
44+
tail = currentSize
45+
}
46+
}

common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ class UnconfinedTest : TestBase() {
1616
launch {
1717
expect(4)
1818
launch {
19-
expect(7)
19+
expect(6)
2020
}
2121

2222
launch {
23-
expect(6)
23+
expect(7)
2424
}
2525
expect(5)
2626
}

core/kotlinx-coroutines-core/test/UnconfinedConcurrentTest.kt renamed to core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import org.junit.Test
99
import java.util.concurrent.*
1010
import kotlin.test.*
1111

12-
class UnconfinedConcurrentTest : TestBase() {
12+
class UnconfinedConcurrentStressTest : TestBase() {
1313
private val threads = 4
14-
private val executor = newFixedThreadPoolContext(threads, "UnconfinedConcurrentTest")
14+
private val executor = newFixedThreadPoolContext(threads, "UnconfinedConcurrentStressTest")
1515
private val threadLocal = ThreadLocal<Int>()
1616

1717
@After
@@ -21,7 +21,7 @@ class UnconfinedConcurrentTest : TestBase() {
2121

2222
@Test(timeout = 10_000L)
2323
fun testConcurrent() = runTest {
24-
val iterations = 10_000
24+
val iterations = 10_000 * stressTestMultiplier
2525
val startBarrier = CyclicBarrier(threads + 1)
2626
val finishLatch = CountDownLatch(threads)
2727

core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -137,49 +137,6 @@ class TestCoroutineContextTest {
137137
}.await()
138138
}
139139

140-
@Test
141-
fun testBlockingFunctionWithRunBlocking() = withTestContext(injectedContext) {
142-
val delay = 1000L
143-
val expectedValue = 16
144-
145-
val result = runBlocking {
146-
suspendedBlockingFunction(delay) {
147-
expectedValue
148-
}
149-
}
150-
151-
assertEquals(expectedValue, result)
152-
assertEquals(delay, now())
153-
}
154-
155-
@Test
156-
fun testBlockingFunctionWithAsync() = withTestContext(injectedContext) {
157-
val delay = 1000L
158-
val expectedValue = 16
159-
var now = 0L
160-
161-
val deferred = async {
162-
suspendedBlockingFunction(delay) {
163-
expectedValue
164-
}
165-
}
166-
167-
now += advanceTimeBy((delay / 4) - 1)
168-
assertEquals((delay / 4) - 1, now)
169-
assertEquals(now, now())
170-
try {
171-
deferred.getCompleted()
172-
fail("The Job should not have been completed yet.")
173-
} catch (e: Exception) {
174-
// Success.
175-
}
176-
177-
now += advanceTimeBy(1)
178-
assertEquals(delay, now())
179-
assertEquals(now, now())
180-
assertEquals(expectedValue, deferred.getCompleted())
181-
}
182-
183140
private suspend fun <T> TestCoroutineContext.suspendedBlockingFunction(delay: Long, function: () -> T): T {
184141
delay(delay / 4)
185142
return runBlocking {

0 commit comments

Comments
 (0)