Skip to content

Commit e812e65

Browse files
committed
Introduce mutex based on new parking
1 parent 7935e64 commit e812e65

File tree

20 files changed

+627
-270
lines changed

20 files changed

+627
-270
lines changed

atomicfu/api/atomicfu.api

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,20 @@ public final class kotlinx/atomicfu/TraceKt {
135135
public static final fun named (Lkotlinx/atomicfu/TraceBase;Ljava/lang/String;)Lkotlinx/atomicfu/TraceBase;
136136
}
137137

138+
public final class kotlinx/atomicfu/locks/Mutex {
139+
public fun <init> ()V
140+
public fun <init> (Ljava/util/concurrent/locks/ReentrantLock;)V
141+
public final fun getReentrantLock ()Ljava/util/concurrent/locks/ReentrantLock;
142+
public final fun isLocked ()Z
143+
public final fun lock ()V
144+
public final fun tryLock ()Z
145+
public final fun unlock ()V
146+
}
147+
148+
public final class kotlinx/atomicfu/locks/MutexKt {
149+
public static final fun withLock (Lkotlinx/atomicfu/locks/Mutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
150+
}
151+
138152
public final class kotlinx/atomicfu/parking/KThread {
139153
public static final field Companion Lkotlinx/atomicfu/parking/KThread$Companion;
140154
}

atomicfu/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ kotlin {
119119

120120
jvmTest {
121121
dependencies {
122+
implementation("org.jetbrains.kotlinx:lincheck:2.35")
122123
implementation("org.jetbrains.kotlin:kotlin-reflect")
123124
implementation("org.jetbrains.kotlin:kotlin-test")
124125
implementation("org.jetbrains.kotlin:kotlin-test-junit")
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlin.contracts.ExperimentalContracts
4+
import kotlin.contracts.InvocationKind
5+
import kotlin.contracts.contract
6+
7+
/**
8+
* Mutual exclusion for Kotlin Multiplatform.
9+
*
10+
* It can protect a shared resource or critical section from multiple thread accesses.
11+
* Threads can acquire the lock by calling [lock] and release the lock by calling [unlock].
12+
*
13+
* When a thread calls [lock] while another thread is locked, it will suspend until the lock is released.
14+
* When multiple threads are waiting for the lock, they will acquire it in a fair order (first in first out).
15+
*
16+
* It is reentrant, meaning the lock holding thread can call [lock] multiple times without suspending.
17+
* To release the lock (after multiple [lock] calls) an equal number of [unlock] calls are required.
18+
*
19+
* This Mutex should not be used in combination with coroutines and `suspend` functions
20+
* as it blocks the waiting thread.
21+
* Use the `Mutex` from the coroutines library instead.
22+
*
23+
* ```Kotlin
24+
* mutex.withLock {
25+
* // Critical section only executed by
26+
* // one thread at a time.
27+
* }
28+
* ```
29+
*/
30+
expect class Mutex() {
31+
/**
32+
* Returns `true` if this mutex is locked.
33+
*/
34+
fun isLocked(): Boolean
35+
36+
/**
37+
* Tries to lock this mutex, returning `false` if this mutex is already locked.
38+
*
39+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
40+
* released at the end of your critical section, and [unlock] is never invoked before a successful
41+
* lock acquisition.
42+
*/
43+
fun tryLock(): Boolean
44+
45+
/**
46+
* Locks the mutex, suspends the thread until the lock is acquired.
47+
*
48+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
49+
* released at the end of your critical section, and [unlock] is never invoked before a successful
50+
* lock acquisition.
51+
*/
52+
fun lock()
53+
54+
/**
55+
* Releases the lock.
56+
* Throws [IllegalStateException] when the current thread is not holding the lock.
57+
*
58+
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
59+
* released at the end of the critical section, and [unlock] is never invoked before a successful
60+
* lock acquisition.
61+
*/
62+
fun unlock()
63+
}
64+
65+
/**
66+
* Executes the given code [block] under this mutex's lock.
67+
*
68+
* @return result of [block]
69+
*/
70+
@OptIn(ExperimentalContracts::class)
71+
inline fun <T> Mutex.withLock(block: () -> T): T {
72+
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
73+
lock()
74+
return try {
75+
block()
76+
} finally {
77+
unlock()
78+
}
79+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlinx.atomicfu.AtomicRef
4+
import kotlinx.atomicfu.atomic
5+
import kotlinx.atomicfu.parking.ThreadParker
6+
import kotlinx.atomicfu.parking.currentThreadId
7+
8+
internal class NativeMutex {
9+
/**
10+
* Mutex implementation for Kotlin/Native.
11+
* In concurrentMain sourceSet to be testable with Lincheck.
12+
*
13+
* The [state] variable stands for: 0 -> Lock is free
14+
* 1 -> Lock is locked but no waiters
15+
* 4 -> Lock is locked with 3 waiters
16+
*
17+
* The state.incrementAndGet() call makes my claim on the lock.
18+
* The returned value either means I acquired it (when it is 1).
19+
* Or I need to enqueue and park (when it is > 1).
20+
*
21+
* The [holdCount] variable is to enable reentrancy.
22+
*
23+
* Works by using a [parkingQueue].
24+
* When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue].
25+
* On enqueue the parking queue provides the second last node, this node is used to park on.
26+
* When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node.
27+
* Since a woken up thread is first inline it means that it's node is the head and can therefore dequeue.
28+
*
29+
* Unlocking happens by calling state.decrementAndGet().
30+
* When the returned value is 0 it means the lock is free and we can simply return.
31+
* If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue.
32+
* This even works when a thread is not parked yet,
33+
* since the ThreadParker can be pre-unparked resulting in the parking call to return immediately.
34+
*/
35+
private val parkingQueue = ParkingQueue()
36+
private val owningThread = atomic(-1L)
37+
private val state = atomic(0)
38+
private val holdCount = atomic(0)
39+
40+
41+
fun lock() {
42+
val currentThreadId = currentThreadId()
43+
44+
// Has to be checked in this order!
45+
if (holdCount.value > 0 && currentThreadId == owningThread.value) {
46+
// Is reentring thread
47+
holdCount.incrementAndGet()
48+
return
49+
}
50+
51+
// Otherwise try acquire lock
52+
val newState = state.incrementAndGet()
53+
// If new state 1 than I have acquired lock skipping queue.
54+
if (newState == 1) {
55+
owningThread.value = currentThreadId
56+
holdCount.incrementAndGet()
57+
return
58+
}
59+
60+
// If state larger than 1 -> enqueue and park
61+
// When woken up thread has acquired lock and his node in the queue is therefore at the head.
62+
// Remove head
63+
if (newState > 1) {
64+
val prevNode = parkingQueue.enqueue()
65+
prevNode.parker.park()
66+
parkingQueue.dequeue()
67+
owningThread.value = currentThreadId
68+
holdCount.incrementAndGet()
69+
return
70+
}
71+
}
72+
73+
fun unlock() {
74+
val currentThreadId = currentThreadId()
75+
val currentOwnerId = owningThread.value
76+
if (currentThreadId != currentOwnerId) throw IllegalStateException("Thread is not holding the lock")
77+
78+
// dec hold count
79+
val newHoldCount = holdCount.decrementAndGet()
80+
if (newHoldCount > 0) return
81+
if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked")
82+
83+
// Lock is released by decrementing (only if decremented to 0)
84+
val currentState = state.decrementAndGet()
85+
if (currentState == 0) return
86+
87+
// If waiters wake up the first in line. The woken up thread will dequeue the node.
88+
if (currentState > 0) {
89+
val nextParker = parkingQueue.getHead()
90+
nextParker.parker.unpark()
91+
return
92+
}
93+
}
94+
95+
fun isLocked(): Boolean {
96+
return state.value > 0
97+
}
98+
99+
fun tryLock(): Boolean {
100+
val currentThreadId = currentThreadId()
101+
if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) {
102+
owningThread.value = currentThreadId
103+
holdCount.incrementAndGet()
104+
return true
105+
}
106+
return false
107+
}
108+
109+
// Based on Micheal-Scott Queue
110+
private class ParkingQueue {
111+
private val head: AtomicRef<Node>
112+
private val tail: AtomicRef<Node>
113+
114+
init {
115+
val first = Node()
116+
head = atomic(first)
117+
tail = atomic(first)
118+
}
119+
120+
fun getHead(): Node {
121+
return head.value
122+
}
123+
124+
fun enqueue(): Node {
125+
while (true) {
126+
val node = Node()
127+
val curTail = tail.value
128+
if (curTail.next.compareAndSet(null, node)) {
129+
tail.compareAndSet(curTail, node)
130+
return curTail
131+
}
132+
else tail.compareAndSet(curTail, curTail.next.value!!)
133+
}
134+
}
135+
136+
fun dequeue() {
137+
while (true) {
138+
val currentHead = head.value
139+
val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible")
140+
if (head.compareAndSet(currentHead, currentHeadNext)) return
141+
}
142+
}
143+
144+
}
145+
146+
private class Node {
147+
val parker = ThreadParker()
148+
val next = atomic<Node?>(null)
149+
}
150+
}

atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/parking/KThread.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ expect class Parker private constructor() {
3333
*/
3434
fun unpark(kThread: KThread): Unit
3535
}
36-
}
36+
}
37+
38+
internal expect fun currentThreadId(): Long
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlinx.atomicfu.parking.sleepMills
4+
import kotlinx.atomicfu.parking.testThread
5+
import kotlin.test.Test
6+
import kotlin.test.assertEquals
7+
8+
class NativeMutexTest {
9+
10+
11+
@Test
12+
fun testNativeMutexSlow() {
13+
val mutex = NativeMutex()
14+
val resultList = mutableListOf<String>()
15+
16+
val fut1 = testThread {
17+
repeat(30) { i ->
18+
mutex.lock()
19+
resultList.add("a$i")
20+
sleepMills(100)
21+
resultList.add("a$i")
22+
mutex.unlock()
23+
}
24+
}
25+
26+
val fut2 = testThread {
27+
repeat(30) { i ->
28+
mutex.lock()
29+
resultList.add("b$i")
30+
sleepMills(100)
31+
resultList.add("b$i")
32+
mutex.unlock()
33+
}
34+
}
35+
36+
repeat(30) { i ->
37+
mutex.lock()
38+
resultList.add("c$i")
39+
sleepMills(100)
40+
resultList.add("c$i")
41+
mutex.unlock()
42+
}
43+
fut1.join()
44+
fut2.join()
45+
46+
resultList.filterIndexed { i, _ -> i % 2 == 0 }
47+
.zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b ->
48+
assertEquals(a, b)
49+
}
50+
}
51+
52+
@Test
53+
fun testNativeMutexFast() {
54+
val mutex = Mutex()
55+
val resultList = mutableListOf<String>()
56+
57+
val fut1 = testThread {
58+
repeat(30000) { i ->
59+
mutex.lock()
60+
resultList.add("a$i")
61+
resultList.add("a$i")
62+
mutex.unlock()
63+
}
64+
}
65+
66+
val fut2 = testThread {
67+
repeat(30000) { i ->
68+
mutex.lock()
69+
resultList.add("b$i")
70+
resultList.add("b$i")
71+
mutex.unlock()
72+
}
73+
}
74+
75+
repeat(30000) { i ->
76+
mutex.lock()
77+
resultList.add("c$i")
78+
resultList.add("c$i")
79+
mutex.unlock()
80+
}
81+
fut1.join()
82+
fut2.join()
83+
84+
resultList
85+
.filterIndexed { i, _ -> i % 2 == 0 }
86+
.zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b ->
87+
assertEquals(a, b)
88+
}
89+
}
90+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlin.test.Test
4+
import kotlin.test.assertFails
5+
6+
class ReentrancyTests {
7+
8+
@Test
9+
fun reentrantTestSuccess() {
10+
val lock = NativeMutex()
11+
lock.lock()
12+
lock.lock()
13+
lock.unlock()
14+
lock.unlock()
15+
}
16+
17+
@Test
18+
fun reentrantTestFail() {
19+
val lock = NativeMutex()
20+
lock.lock()
21+
lock.lock()
22+
lock.unlock()
23+
lock.unlock()
24+
assertFails {
25+
lock.unlock()
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)