16
16
17
17
package kotlinx.coroutines.experimental
18
18
19
- import kotlin.browser.*
20
19
import kotlin.coroutines.experimental.*
20
+ import org.w3c.dom.*
21
21
22
- internal object JSDispatcher : CoroutineDispatcher(), Delay {
23
- // Check if we are in the browser and must use postMessage to avoid setTimeout throttling
24
- private val messageQueue =
25
- if (jsTypeOf(window) != " undefined" ) MessageQueue ().apply { register() } else null
26
-
22
+ internal class NodeDispatcher : CoroutineDispatcher (), Delay {
27
23
override fun dispatch (context : CoroutineContext , block : Runnable ) {
28
- if (messageQueue != null ) {
29
- messageQueue.enqueue(block)
30
- } else {
31
- setTimeout({ block.run () }, 0 )
32
- }
24
+ setTimeout({ block.run () }, 0 )
33
25
}
34
26
35
27
override fun scheduleResumeAfterDelay (time : Int , continuation : CancellableContinuation <Unit >) {
@@ -46,60 +38,105 @@ internal object JSDispatcher : CoroutineDispatcher(), Delay {
46
38
}
47
39
}
48
40
49
- // it is open for tests
50
- internal open class MessageQueue {
51
- val yieldEvery = 16 // yield to JS event loop after this many processed messages
52
-
53
- private val messageName = " JSDispatcher.dispatch"
54
- private var scheduled = false
41
+ internal class WindowDispatcher (private val window : Window ) : CoroutineDispatcher(), Delay {
42
+ private val messageName = " dispatchCoroutine"
55
43
56
- private var queue = arrayOfNulls<Runnable >(8 )
57
- private var head = 0
58
- private var tail = 0
44
+ private val queue = object : MessageQueue () {
45
+ override fun schedule () {
46
+ window.postMessage(messageName, " *" )
47
+ }
48
+ }
59
49
60
- fun register () {
50
+ init {
61
51
window.addEventListener(" message" , { event: dynamic ->
62
52
if (event.source == window && event.data == messageName) {
63
53
event.stopPropagation()
64
- process()
54
+ queue. process()
65
55
}
66
56
}, true )
67
57
}
68
58
69
- // it is open for tests
70
- open fun schedule () {
71
- window.postMessage(messageName, " *" )
59
+ override fun dispatch (context : CoroutineContext , block : Runnable ) {
60
+ queue.enqueue(block)
61
+ }
62
+
63
+ override fun scheduleResumeAfterDelay (time : Int , continuation : CancellableContinuation <Unit >) {
64
+ window.setTimeout({ with (continuation) { resumeUndispatched(Unit ) } }, time.coerceAtLeast(0 ))
65
+ }
66
+
67
+ override fun invokeOnTimeout (time : Int , block : Runnable ): DisposableHandle {
68
+ val handle = window.setTimeout({ block.run () }, time.coerceAtLeast(0 ))
69
+ return object : DisposableHandle {
70
+ override fun dispose () {
71
+ window.clearTimeout(handle)
72
+ }
73
+ }
74
+ }
75
+ }
76
+
77
+ internal abstract class MessageQueue : Queue <Runnable >() {
78
+ val yieldEvery = 16 // yield to JS event loop after this many processed messages
79
+
80
+ private var scheduled = false
81
+
82
+ abstract fun schedule ()
83
+
84
+ fun enqueue (element : Runnable ) {
85
+ add(element)
86
+ if (! scheduled) {
87
+ scheduled = true
88
+ schedule()
89
+ }
72
90
}
73
91
92
+ fun process () {
93
+ try {
94
+ // limit number of processed messages
95
+ repeat(yieldEvery) {
96
+ val element = poll() ? : return @process
97
+ element.run ()
98
+ }
99
+ } finally {
100
+ if (isEmpty) {
101
+ scheduled = false
102
+ } else {
103
+ schedule()
104
+ }
105
+ }
106
+ }
107
+ }
108
+
109
+ internal open class Queue <T : Any > {
110
+ private var queue = arrayOfNulls<Any ?>(8 )
111
+ private var head = 0
112
+ private var tail = 0
113
+
74
114
val isEmpty get() = head == tail
75
115
76
- fun poll (): Runnable ? {
116
+ fun poll (): T ? {
77
117
if (isEmpty) return null
78
118
val result = queue[head]!!
79
119
queue[head] = null
80
120
head = head.next()
81
- return result
121
+ @Suppress(" UNCHECKED_CAST" )
122
+ return result as T
82
123
}
83
124
84
- tailrec fun enqueue ( block : Runnable ) {
125
+ tailrec fun add ( element : T ) {
85
126
val newTail = tail.next()
86
127
if (newTail == head) {
87
128
resize()
88
- enqueue(block ) // retry with larger size
129
+ add(element ) // retry with larger size
89
130
return
90
131
}
91
- queue[tail] = block
132
+ queue[tail] = element
92
133
tail = newTail
93
- if (! scheduled) {
94
- scheduled = true
95
- schedule()
96
- }
97
134
}
98
135
99
- fun resize () {
136
+ private fun resize () {
100
137
var i = head
101
138
var j = 0
102
- val a = arrayOfNulls<Runnable >(queue.size * 2 )
139
+ val a = arrayOfNulls<Any ? >(queue.size * 2 )
103
140
while (i != tail) {
104
141
a[j++ ] = queue[i]
105
142
i = i.next()
@@ -113,22 +150,6 @@ internal open class MessageQueue {
113
150
val j = this + 1
114
151
return if (j == queue.size) 0 else j
115
152
}
116
-
117
- fun process () {
118
- try {
119
- // limit number of processed messages
120
- repeat(yieldEvery) {
121
- val block = poll() ? : return @process
122
- block.run ()
123
- }
124
- } finally {
125
- if (isEmpty) {
126
- scheduled = false
127
- } else {
128
- schedule()
129
- }
130
- }
131
- }
132
153
}
133
154
134
155
// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
0 commit comments