Skip to content

Commit a88e151

Browse files
committed
Timeout semantics
1 parent 2ff2e31 commit a88e151

File tree

9 files changed

+183
-36
lines changed

9 files changed

+183
-36
lines changed

atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt renamed to atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.atomicfu.locks
33
import kotlin.contracts.ExperimentalContracts
44
import kotlin.contracts.InvocationKind
55
import kotlin.contracts.contract
6+
import kotlin.time.Duration
67

78
/**
89
* Mutual exclusion for Kotlin Multiplatform.
@@ -27,12 +28,7 @@ import kotlin.contracts.contract
2728
* }
2829
* ```
2930
*/
30-
expect class Mutex() {
31-
/**
32-
* Returns `true` if this mutex is locked.
33-
*/
34-
fun isLocked(): Boolean
35-
31+
expect class SynchronousMutex() {
3632
/**
3733
* Tries to lock this mutex, returning `false` if this mutex is already locked.
3834
*
@@ -41,6 +37,8 @@ expect class Mutex() {
4137
* lock acquisition.
4238
*/
4339
fun tryLock(): Boolean
40+
41+
fun tryLock(timeout: Duration): Boolean
4442

4543
/**
4644
* Locks the mutex, suspends the thread until the lock is acquired.
@@ -68,7 +66,7 @@ expect class Mutex() {
6866
* @return result of [block]
6967
*/
7068
@OptIn(ExperimentalContracts::class)
71-
inline fun <T> Mutex.withLock(block: () -> T): T {
69+
inline fun <T> SynchronousMutex.withLock(block: () -> T): T {
7270
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
7371
lock()
7472
return try {

atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package kotlinx.atomicfu.locks
33
import kotlinx.atomicfu.AtomicRef
44
import kotlinx.atomicfu.atomic
55
import kotlin.time.Duration
6+
import kotlin.time.TimeSource
67

78
internal class NativeMutex(
8-
val park : () -> Unit = { ParkingSupport.park(Duration.INFINITE) },
9+
val park : (Duration) -> Unit = { ParkingSupport.park(it) },
910
val unpark : (ParkingHandle) -> Unit = ParkingSupport::unpark,
1011
) {
1112
/**
@@ -38,16 +39,19 @@ internal class NativeMutex(
3839
private val owningThread = atomic<ParkingHandle?>(null)
3940
private val state = atomic(0)
4041
private val holdCount = atomic(0)
41-
42-
42+
4343
fun lock() {
44+
tryLock(Duration.INFINITE)
45+
}
46+
47+
fun tryLock(duration: Duration): Boolean {
4448
val currentParkingHandle = ParkingSupport.currentThreadHandle()
4549

4650
// Has to be checked in this order!
4751
if (holdCount.value > 0 && currentParkingHandle == owningThread.value) {
4852
// Is reentring thread
4953
holdCount.incrementAndGet()
50-
return
54+
return true
5155
}
5256

5357
// Otherwise try acquire lock
@@ -56,20 +60,23 @@ internal class NativeMutex(
5660
if (newState == 1) {
5761
owningThread.value = currentParkingHandle
5862
holdCount.incrementAndGet()
59-
return
63+
return true
6064
}
6165

6266
// If state larger than 1 -> enqueue and park
6367
// When woken up thread has acquired lock and his node in the queue is therefore at the head.
6468
// Remove head
6569
if (newState > 1) {
6670
val prevNode = parkingQueue.enqueue()
67-
prevNode.nodeWait()
71+
// if timeout
72+
if (!prevNode.nodeWait(duration)) return false
6873
parkingQueue.dequeue()
6974
owningThread.value = currentParkingHandle
7075
holdCount.incrementAndGet()
71-
return
76+
return true
7277
}
78+
79+
return true
7380
}
7481

7582
fun unlock() {
@@ -88,16 +95,18 @@ internal class NativeMutex(
8895

8996
// If waiters wake up the first in line. The woken up thread will dequeue the node.
9097
if (currentState > 0) {
91-
val nextParker = parkingQueue.getHead()
92-
nextParker.nodeWake()
98+
var nextParker = parkingQueue.getHead()
99+
// If cancelled And there are other waiting nodes, go to next
100+
while (!nextParker.nodeWake() && state.decrementAndGet() > 0) {
101+
// We only dequeue here in case of timeoud out node.
102+
// Dequeueing woken nodes can lead to issues when pre-unparked.
103+
parkingQueue.dequeue()
104+
nextParker = parkingQueue.getHead()
105+
}
93106
return
94107
}
95108
}
96109

97-
fun isLocked(): Boolean {
98-
return state.value > 0
99-
}
100-
101110
fun tryLock(): Boolean {
102111
val currentThreadId = ParkingSupport.currentThreadHandle()
103112
if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) {
@@ -149,30 +158,42 @@ internal class NativeMutex(
149158
val parker = atomic<Any>(Empty)
150159
val next = atomic<Node?>(null)
151160

152-
fun nodeWait() {
161+
fun nodeWait(duration: Duration): Boolean {
162+
val deadline = TimeSource.Monotonic.markNow() + duration
153163
while (true) {
154164
when (parker.value) {
155-
Empty -> if (parker.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) park()
156-
is ParkingHandle -> park()
157-
Awoken -> return
165+
Empty -> if (parker.compareAndSet(Empty, ParkingSupport.currentThreadHandle())) {
166+
park(deadline - TimeSource.Monotonic.markNow())
167+
if (deadline < TimeSource.Monotonic.markNow())
168+
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
169+
}
170+
is ParkingHandle -> {
171+
park(deadline - TimeSource.Monotonic.markNow())
172+
if (deadline < TimeSource.Monotonic.markNow())
173+
parker.compareAndSet(ParkingSupport.currentThreadHandle(), Cancelled)
174+
}
175+
Awoken -> return true
176+
Cancelled -> return false
158177
}
159178
}
160179
}
161180

162-
fun nodeWake() {
181+
fun nodeWake(): Boolean {
163182
while (true) {
164183
when (val currentState = parker.value) {
165-
Empty -> if (parker.compareAndSet(Empty, Awoken)) return
184+
Empty -> if (parker.compareAndSet(Empty, Awoken)) return true
166185
is ParkingHandle -> if (parker.compareAndSet(currentState, Awoken)) {
167186
unpark(currentState)
168-
return
187+
return true
169188
}
170189
Awoken -> throw IllegalStateException("Node is already woken")
190+
Cancelled -> return false
171191
}
172192
}
173193
}
174194
}
175195

176196
private object Empty
177197
private object Awoken
198+
private object Cancelled
178199
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import kotlinx.atomicfu.atomic
4+
import kotlin.random.Random
5+
import kotlin.test.Test
6+
import kotlin.test.assertEquals
7+
import kotlin.time.Duration.Companion.milliseconds
8+
9+
class LockWithTimoutTests {
10+
11+
// Helper class with atomic counter in constructor
12+
class AtomicCounter(initialValue: Int = 0) {
13+
val counter = atomic(initialValue)
14+
15+
fun incrementAndGet(): Int = counter.incrementAndGet()
16+
val value: Int get() = counter.value
17+
}
18+
19+
@Test
20+
fun timeoutLockStressTest() {
21+
val mutex = SynchronousMutex()
22+
val counter = AtomicCounter(0)
23+
val targetCount = 1000
24+
val threads = mutableListOf<TestThread>()
25+
26+
// Create 5 test threads
27+
repeat(5) { threadId ->
28+
val thread = testThread {
29+
while (counter.value < targetCount) {
30+
// Try to acquire the lock with a timeout
31+
if (mutex.tryLock((Random.nextInt(1, 10)).milliseconds)) {
32+
try {
33+
// Increment the counter if lock was acquired
34+
if (counter.value < targetCount) {
35+
counter.incrementAndGet()
36+
}
37+
// Random sleep to increase variation
38+
sleepMillis(Random.nextInt(0, 5).toLong())
39+
} finally {
40+
mutex.unlock()
41+
}
42+
}
43+
44+
// Random sleep between attempts to increase variation
45+
sleepMillis(Random.nextInt(0, 3).toLong())
46+
}
47+
}
48+
threads.add(thread)
49+
}
50+
51+
// Wait for all threads to complete
52+
threads.forEach { it.join() }
53+
54+
// Verify the counter reached the target
55+
assertEquals(targetCount, counter.value)
56+
}
57+
58+
}

atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class NativeMutexTest {
4949

5050
@Test
5151
fun testNativeMutexFast() {
52-
val mutex = Mutex()
52+
val mutex = SynchronousMutex()
5353
val resultList = mutableListOf<String>()
5454

5555
val fut1 = testThread {
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package kotlinx.atomicfu.locks
22

3+
import kotlin.time.Duration
4+
35
/**
46
* Part of multiplatform mutex.
57
* Since this mutex will run in a single threaded environment, it doesn't provide any real synchronization.
68
*
79
* It does keep track of reentrancy.
810
*/
9-
actual class Mutex {
11+
actual class SynchronousMutex {
1012
private var state = 0
11-
actual fun isLocked(): Boolean = state != 0
1213
actual fun tryLock(): Boolean = true
14+
actual fun tryLock(timeout: Duration): Boolean = true
1315
actual fun lock(): Unit { state++ }
1416
actual fun unlock(): Unit { if (state-- < 0) throw IllegalStateException("Mutex already unlocked") }
1517
}

atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt renamed to atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/SynchronousMutex.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package kotlinx.atomicfu.locks
22

3+
import java.util.concurrent.TimeUnit
4+
import kotlin.time.Duration
5+
36
/**
47
* This mutex uses a [ReentrantLock].
58
*
69
* [getReentrantLock] obtains the actual [ReentrantLock].
7-
* Construct with `Mutex(reentrantLock)` to create a [Mutex] that uses an existing instance of [ReentrantLock].
10+
* Construct with `Mutex(reentrantLock)` to create a [SynchronousMutex] that uses an existing instance of [ReentrantLock].
811
*/
9-
actual class Mutex(private val reentrantLock: java.util.concurrent.locks.ReentrantLock) {
12+
actual class SynchronousMutex(private val reentrantLock: java.util.concurrent.locks.ReentrantLock) {
1013
actual constructor(): this(ReentrantLock())
11-
actual fun isLocked(): Boolean = reentrantLock.isLocked
14+
actual fun tryLock(timeout: Duration): Boolean = reentrantLock.tryLock(timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS)
1215
actual fun tryLock(): Boolean = reentrantLock.tryLock()
1316
actual fun lock() = reentrantLock.lock()
1417
actual fun unlock() = reentrantLock.unlock()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package kotlinx.atomicfu.locks
2+
3+
import org.jetbrains.kotlinx.lincheck.annotations.Operation
4+
import org.jetbrains.kotlinx.lincheck.check
5+
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions
6+
import org.jetbrains.kotlinx.lincheck.util.LoggingLevel
7+
import org.junit.Test
8+
import java.util.concurrent.ConcurrentHashMap
9+
import kotlin.time.Duration.Companion.nanoseconds
10+
11+
class NativeMutexTimeoutLincheckTest {
12+
class Counter {
13+
@Volatile
14+
private var value = 0
15+
16+
fun inc(): Int = ++value
17+
fun get() = value
18+
}
19+
private val counter = Counter()
20+
private val localParkers = ConcurrentHashMap<ParkingHandle, ThreadParker>()
21+
22+
private val lock = NativeMutex(
23+
park = { localParkers[ParkingSupport.currentThreadHandle()]!!.parkNanos(it.inWholeNanoseconds) },
24+
unpark = { localParkers[it]!!.unpark() }
25+
)
26+
27+
@Test
28+
fun modelCheckingTest(): Unit = ModelCheckingOptions()
29+
.iterations(2) // Change to 300 for exhaustive testing
30+
.invocationsPerIteration(5_000)
31+
.actorsBefore(1)
32+
.threads(3)
33+
.actorsPerThread(3)
34+
.actorsAfter(0)
35+
.hangingDetectionThreshold(100)
36+
.logLevel(LoggingLevel.INFO)
37+
.check(this::class.java)
38+
39+
@Operation
40+
fun incNoTimout() {
41+
localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() }
42+
lock.lock()
43+
counter.inc()
44+
lock.unlock()
45+
}
46+
47+
@Operation
48+
fun incTimeout() {
49+
localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() }
50+
if (lock.tryLock(0.nanoseconds)) {
51+
counter.inc()
52+
lock.unlock()
53+
}
54+
}
55+
56+
@Operation
57+
fun get() {
58+
localParkers.computeIfAbsent(ParkingSupport.currentThreadHandle()) { ThreadParker() }
59+
lock.lock()
60+
counter.get()
61+
lock.unlock()
62+
}
63+
}

atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package kotlinx.atomicfu.locks
22

33
public actual open class SynchronizedObject {
44

5-
private val nativeMutex = Mutex()
5+
private val nativeMutex = SynchronousMutex()
66
public fun lock() = nativeMutex.lock()
77
public fun tryLock(): Boolean = nativeMutex.tryLock()
88
public fun unlock() = nativeMutex.unlock()
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package kotlinx.atomicfu.locks
22

3-
actual class Mutex {
3+
import kotlin.time.Duration
4+
5+
actual class SynchronousMutex {
46
private val lock = NativeMutex()
5-
actual fun isLocked() = lock.isLocked()
67
actual fun tryLock() = lock.tryLock()
8+
actual fun tryLock(timeout: Duration) = lock.tryLock(timeout)
79
actual fun lock() = lock.lock()
810
actual fun unlock() = lock.unlock()
911
}

0 commit comments

Comments
 (0)