Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions StandardLibrary/Sources/LibC.hylo
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public fun memmove(
@ffi("sysconf")
internal fun sysconf(name: C.int) -> C.long

/// OS, indirect system call, tailored for use with SYS_futex.
///
/// See https://www.man7.org/linux/man-pages/man2/futex.2.html.
@ffi("syscall")
internal fun syscall_for_futex(number: C.long, uaddr: MemoryAddress, futex_op: C.int, val: UInt32, timeout: MemoryAddress, uaddr2: MemoryAddress, val3: UInt32) -> C.long

/// Gives the current thread time slice back to the OS scheduler.
///
/// The thread is moved to the end of the queue for its static priority and a new thread gets to run.
Expand Down
55 changes: 55 additions & 0 deletions StandardLibrary/Sources/LowLevel/ErrorHandling.hylo
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

/// Returns the current value of `errno`.
///
/// See: https://www.man7.org/linux/man-pages/man3/errno.3.html
internal fun errno() -> C.int {
#if os(Linux)
return Pointer<C.int>(type_punning: __errno_location()).unsafe[].copy()
#elseif os(macOS)
return Pointer<C.int>(type_punning: __error()).unsafe[].copy()
#else
return 0
#endif
}

/// If `is_failure`, reports the error stored in `errno` with description `operation` and aborts the program.
///
/// TODO: We should not terminate the program on `errno` failure, as these are typically recoverable errors.
/// However, currently the best way to handle errors is with assertions. To be fixed later.
internal fun report_errno_failure_if(_ is_failure: Bool, operation: String) {
if is_failure {
report_errno_failure(operation: operation)
}
}

/// Reports the error stored in `errno` with description `operation` and aborts the program.
///
/// TODO: We should not terminate the program on `errno` failure, as these are typically recoverable errors.
/// However, currently the best way to handle errors is with assertions. To be fixed later.
internal fun report_errno_failure(operation: String) {
let err = errno()
print("Error: ", terminator: "")
print(operation, terminator: "")
print("; errno: ", terminator: "")
print(Int(truncating_or_extending: err))
perror(.null())
fatal_error("Aborting due to error")
}

/// Returns the current value of `errno`.
///
/// On Linux, `errno` is implemented as `__errno_location`.
@ffi("__errno_location")
internal fun __errno_location() -> MemoryAddress

/// Returns the current value of `errno`.
///
/// On MacOS, `errno` is implemented as `__error`.
@ffi("__error")
internal fun __error() -> MemoryAddress

/// Produces a message on standard error describing the last error encountered during a call to a system or library function.
///
/// See: https://www.man7.org/linux/man-pages/man3/perror.3.html
@ffi("perror")
internal fun perror(_ s: MemoryAddress)
74 changes: 74 additions & 0 deletions StandardLibrary/Sources/LowLevel/Threading/Mutex.hylo
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@

/// A mutual exclusion object that allows multiple threads to synchronize access to a shared resource.
///
/// The shared resource is held inside the mutex, and can only be accessed through a lock.
///
/// This object is by definition shareable; for convenience, we don't make any of the methods `inout`,
/// even if we mutate internal state.
///
/// A mutex is, by default, a bottleneck; it prevents more than one thread to make progress at the same time.
public type Mutex<T: Deinitializable & Movable>: Deinitializable, Movable {

/// The OS-level implementation of the mutex.
internal var inner: OSFutexMutex
/// The data that is being protected by the mutex.
internal var data: T

/// Constructs `self` to protect `data`.
public init(_ data: sink T) {
&self.inner = .new()
&self.data = data
}

/// Deinitializes `self`.
public fun deinit() sink {
&inner.deinit()
&data.deinit()
}

/// Accesses the protected data within the mutex, calling `f` with a mutable reference to the data.
public fun access<E>(within f: inout [E](inout T) inout -> Void) {
inner_as_mutable().unsafe[].lock()
&f(&(data_as_mutable().unsafe[]))
inner_as_mutable().unsafe[].unlock()
}
/// Try accessing the protected data within the mutex, calling `f` with a mutable reference to the data.
/// Returns `true` if the lock was acquired and `f` executed, `false` otherwise.
public fun try_access<E>(within f: [E](inout T) -> Void) -> Bool {
if inner_as_mutable().unsafe[].try_lock() {
f(&(data_as_mutable().unsafe[]))
inner_as_mutable().unsafe[].unlock()
return true
}
return false
}

/// Deinitializes `self` and returns the protected data.
public fun unwrap() sink -> T {
// No need for locking here, as we are sinking.
data_as_mutable().unsafe_pointee()
}

/// Returns a mutable pointer to our inner implementation object.
fun inner_as_mutable() -> PointerToMutable<OSFutexMutex> {
return PointerToMutable(adding_mutation_to: pointer[to: &inner])
}

/// Returns a mutable pointer to the protected data.
fun data_as_mutable() -> PointerToMutable<T> {
return PointerToMutable(adding_mutation_to: pointer[to: &data])
}

}

extension Mutex where T: Copyable {

/// Copies the data protected by the mutex, locking it while performing the copy.
public fun copy_data() -> T {
inner_as_mutable().unsafe[].lock()
var r = data_as_mutable().unsafe[].copy()
inner_as_mutable().unsafe[].unlock()
return r
}

}
130 changes: 130 additions & 0 deletions StandardLibrary/Sources/LowLevel/Threading/OSFutexFunctions.hylo
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/// Sleep until the value at `futex` changes to a value different than `expected`.
public fun futex_wait(futex: inout UInt32, expected: UInt32) {
while true {
// No need to wait if the value already changed.
var value: UInt32 = .new(value: Builtin.atomic_load_relaxed_i32(Builtin.address(of: futex.value)))
if ( value != expected ) {
break
}

#if os(Linux)
LinuxFutex.futex_wait(futex: &futex, expected: expected)
#elseif os(macOS)
MacOSFutex.futex_wait(futex: &futex, expected: expected)
#else
// Do nothing; repeat (in a busy loop) until the value changes.
#endif
}
}

/// Wake up at least `threads_to_wake` threads waiting on `futex`.
public fun futex_wake(futex: inout UInt32, threads_to_wake: UInt32) {
#if os(Linux)
LinuxFutex.futex_wake(futex: &futex, threads_to_wake: threads_to_wake)
#elseif os(macOS)
MacOSFutex.futex_wake(futex: &futex, threads_to_wake: threads_to_wake)
#else
// Do nothing; assume the thread never really went to sleep.
#endif
}

namespace LinuxFutex {

let SYS_futex: C.long = 98
let FUTEX_WAIT: C.int = 0
let FUTEX_WAKE: C.int = 1
let FUTEX_WAIT_PRIVATE: C.int = 128
let FUTEX_WAKE_PRIVATE: C.int = 129
let EAGAIN: C.int = 11

/// Linux implementation for `futex_wait`.
internal fun futex_wait(futex: inout UInt32, expected: UInt32) {
let err = syscall_for_futex(
number: SYS_futex,
uaddr: MemoryAddress(type_punning: mutable_pointer[to: &futex]),
futex_op: FUTEX_WAIT_PRIVATE,
val: expected,
timeout: .null(),
uaddr2: .null(),
val3: 0)
if err == -1 {
// The value was different, without the need for sleeping.
if errno() == EAGAIN {
return
}
// Otherwise, it was an error.
report_errno_failure(operation: "futex_wait")
}
}

/// Linux implementation for `futex_wake`.
internal fun futex_wake(futex: inout UInt32, threads_to_wake: UInt32) {
var r = syscall_for_futex(
number: SYS_futex,
uaddr: MemoryAddress(type_punning: mutable_pointer[to: &futex]),
futex_op: FUTEX_WAKE_PRIVATE,
val: threads_to_wake,
timeout: .null(),
uaddr2: .null(),
val3: 0)
report_errno_failure_if(r == -1, operation: "futex_wake")
}

}

namespace MacOSFutex {

/// An atomic compare-and-wait operation with a timeout, used to implement higher-level synchronization primitives.
///
/// See https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_timeout
@ffi("os_sync_wait_on_address_with_timeout")
internal fun os_sync_wait_on_address_with_timeout(addr: MemoryAddress, value: UInt64, size: C.size_t, flags: UInt32, clockid: UInt32, timeout_ns: UInt64) -> C.int

/// An atomic operation that wakes one thread blocked on a futex wait, used to implement higher-level synchronization primitives.
///
/// See https://developer.apple.com/documentation/os/os_sync_wake_by_address_any
@ffi("os_sync_wake_by_address_any")
internal fun os_sync_wake_by_address_any(addr: MemoryAddress, size: C.size_t, flags: UInt32) -> Int64
// TODO: why do we need to return Int64? documentation says it returns C.int

/// An atomic operation that wakes all threads blocked on a futex wait, used to implement higher-level synchronization primitives.
///
/// See https://developer.apple.com/documentation/os/os_sync_wake_by_address_all
@ffi("os_sync_wake_by_address_all")
internal fun os_sync_wake_by_address_all(addr: MemoryAddress, size: C.size_t, flags: UInt32) -> Int64
// TODO: why do we need to return Int64? documentation says it returns C.int

let OS_SYNC_WAIT_ON_ADDRESS_NONE: UInt32 = 0
let OS_CLOCK_MACH_ABSOLUTE_TIME: UInt32 = 32

/// macOS implementation for `futex_wait`.
internal fun futex_wait(futex: inout UInt32, expected: UInt32) {
let r = os_sync_wait_on_address_with_timeout(
addr: MemoryAddress(type_punning: mutable_pointer[to: &futex]),
value: UInt64(truncating_or_extending: expected),
size: 4,
flags: OS_SYNC_WAIT_ON_ADDRESS_NONE,
clockid: OS_CLOCK_MACH_ABSOLUTE_TIME,
timeout_ns: UInt64.max())
report_errno_failure_if(r == -1, operation: "futex_wait")
}

/// macOS implementation for `futex_wake`.
internal fun futex_wake(futex: inout UInt32, threads_to_wake: UInt32) {
if threads_to_wake == 1 {
let r = os_sync_wake_by_address_any(
addr: MemoryAddress(type_punning: mutable_pointer[to: &futex]),
size: 4,
flags: OS_SYNC_WAIT_ON_ADDRESS_NONE)
report_errno_failure_if(r == -1, operation: "futex_wake")
} else {
let r = os_sync_wake_by_address_all(
addr: MemoryAddress(type_punning: mutable_pointer[to: &futex]),
size: 4,
flags: OS_SYNC_WAIT_ON_ADDRESS_NONE)
report_errno_failure_if(r == -1, operation: "futex_wake")
}
}

}

119 changes: 119 additions & 0 deletions StandardLibrary/Sources/LowLevel/Threading/OSFutexMutex.hylo
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/// The state indicating a mutex that is not locked.
let UNLOCKED: UInt32 = 0
/// The state indicating a mutex that is locked, but not contended.
let LOCKED: UInt32 = 1
/// The state indicating a mutex that is locked and contended.
let CONTENDED: UInt32 = 2

// TODO: should be internal, but, currently that leads to a linker error when Mutex<T> tries to use this.

/// A thin mutex implemented in terms of an OS futex.
///
/// Behaves like a pthread mutex, but is more efficient, as the operations don't involve a system
/// call if the mutex is not contended.
/// In contended cases, the thread is put to sleep and woken up by the OS. This allows efficient
/// usage of thread resources.
///
/// Traps if is moved while locked.
public type OSFutexMutex: Deinitializable, Movable {

/// The futex value.
// TODO: use atomic value here.
var futex: UInt32

/// Initializes `self` with a new mutex.
public init() {
&futex = UNLOCKED.copy()
}

/// Deinitializes `self`.
public fun deinit() sink {
precondition(futex == UNLOCKED, "Mutex is being deinitialized while locked.")
}

/// Locks the mutex.
public fun lock() inout {
// Use the acquire semantic to create a barrier at the beginning of the critical section.
let r = atomic_cmpxchg_acquire_relaxed_i32(&futex, expected: UNLOCKED, desired: LOCKED)
if !r.0 {
lock_contended()
}
}

/// Unlocks the mutex.
public fun unlock() inout {
// Use the release semantic to create a barrier at the end of the critical section.
let old = atomic_swap_release_i32(&futex, UNLOCKED)
// precondition(old == LOCKED, "Mutex is being unlocked while not locked.")
if old == CONTENDED {
// Some thread is sleeping, wake it up.
// Waking up one thread is better for concurrency; eventually all threads will be woken up.
futex_wake(futex: &futex, threads_to_wake: 1)
}
}

/// Attempts to lock the mutex. Returns `true` if the lock was acquired, `false` otherwise.
public fun try_lock() inout -> Bool {
// Use the acquire semantic to create a barrier at the beginning of the critical section.
let r = atomic_cmpxchg_acquire_relaxed_i32(&futex, expected: UNLOCKED, desired: LOCKED)
return r.0
}

/// Called when we are in a contended case to lock the mutex.
fun lock_contended() inout {
// Spin for a little while.
// If the mutex is released quickly, we avoid the overhead of a system call.
var state = spin()

// If we are unlocked, attempt to lock the mutex but don't mark it as CONTENDED.
if state == UNLOCKED {
let r = atomic_cmpxchg_acquire_relaxed_i32(&futex, expected: UNLOCKED, desired: LOCKED)
if r.0 {
return
}
&state = r.1
}

// We are now in a contended state.
while true {
if state != CONTENDED {
// Try to mark the mutex as contended.
if atomic_swap_acquire_i32(&futex, CONTENDED) == UNLOCKED {
// We successfully moved from unlocked to contended, so we successfully locked it.
return;
}
}

// Wait for the futex to change state, assuming it is still CONTENDED.
futex_wait(futex: &futex, expected: CONTENDED)

// Spin again after waking up.
&state = spin()
}
}

/// Spins on the CPU to avoid the overhead of a system call.
/// Exists early when we are not in the locked state.
/// Returns the current state of the mutex.
fun spin() inout -> UInt32 {
var spin_counter = 16000
while true {
// Load the current value of the state.
// Use the acquire semantic to create a barrier at the beginning of the critical section.
// Stop spinning when either:
// - we are unlocked -- thus we can now acquire the lock
// - we are contended -- thus we should wait with a syscall
// - we have spun enough
let state = atomic_load_acquire_i32(&futex)
if state != LOCKED || spin_counter == 0 {
return state
}

// TODO: use spin-waiting-hint to not keep the CPU so busy.

&spin_counter -= 1
}
return UNLOCKED.copy() // should never reach here
}

}
Loading
Loading