Skip to content

Commit d039db8

Browse files
committed
store futexes in per-allocation data rather than globally
1 parent 9160aa0 commit d039db8

File tree

5 files changed

+107
-30
lines changed

5 files changed

+107
-30
lines changed

src/concurrency/sync.rs

+51-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::cell::RefCell;
12
use std::collections::VecDeque;
23
use std::collections::hash_map::Entry;
34
use std::ops::Not;
5+
use std::rc::Rc;
46
use std::time::Duration;
57

68
use rustc_abi::Size;
@@ -121,6 +123,15 @@ struct Futex {
121123
clock: VClock,
122124
}
123125

126+
#[derive(Default, Clone)]
127+
pub struct FutexRef(Rc<RefCell<Futex>>);
128+
129+
impl VisitProvenance for FutexRef {
130+
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
131+
// No provenance
132+
}
133+
}
134+
124135
/// A thread waiting on a futex.
125136
#[derive(Debug)]
126137
struct FutexWaiter {
@@ -137,9 +148,6 @@ pub struct SynchronizationObjects {
137148
rwlocks: IndexVec<RwLockId, RwLock>,
138149
condvars: IndexVec<CondvarId, Condvar>,
139150
pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
140-
141-
/// Futex info for the futex at the given address.
142-
futexes: FxHashMap<u64, Futex>,
143151
}
144152

145153
// Private extension trait for local helper methods
@@ -184,7 +192,7 @@ impl SynchronizationObjects {
184192
}
185193

186194
impl<'tcx> AllocExtra<'tcx> {
187-
pub fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
195+
fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
188196
self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
189197
}
190198
}
@@ -286,7 +294,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
286294
// is truly the only place where the data could be stored.
287295
this.check_ptr_access(ptr, Size::from_bytes(1), CheckInAllocMsg::InboundsTest)?;
288296

289-
let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0)?;
297+
let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0).unwrap();
290298
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
291299
// Due to borrow checker reasons, we have to do the lookup twice.
292300
if alloc_extra.get_sync::<T>(offset).is_none() {
@@ -296,6 +304,31 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
296304
interp_ok(alloc_extra.get_sync::<T>(offset).unwrap())
297305
}
298306

307+
/// If the pointer is inbounds of an allocation, get the synchronization primitive associated
308+
/// with the given pointer, or initialize a new one.
309+
///
310+
/// Otherwise, return `None`.
311+
fn try_get_sync_or_init<'a, T: 'static>(
312+
&'a mut self,
313+
ptr: Pointer,
314+
new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> InterpResult<'tcx, T>,
315+
) -> Option<&'a T>
316+
where
317+
'tcx: 'a,
318+
{
319+
let this = self.eval_context_mut();
320+
if this.ptr_try_get_alloc_id(ptr, 0).ok().is_some_and(|(alloc_id, offset, ..)| {
321+
let (size, _align, kind) = this.get_alloc_info(alloc_id);
322+
// FIXME: we should also check mutability
323+
kind == AllocKind::LiveData && offset < size
324+
}) {
325+
// This cannot fail now.
326+
Some(this.get_sync_or_init(ptr, new).unwrap())
327+
} else {
328+
None
329+
}
330+
}
331+
299332
#[inline]
300333
/// Get the id of the thread that currently owns this lock.
301334
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
@@ -690,7 +723,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
690723
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
691724
fn futex_wait(
692725
&mut self,
693-
addr: u64,
726+
futex_ref: FutexRef,
694727
bitset: u32,
695728
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
696729
retval_succ: Scalar,
@@ -700,23 +733,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
700733
) {
701734
let this = self.eval_context_mut();
702735
let thread = this.active_thread();
703-
let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
736+
let mut futex = futex_ref.0.borrow_mut();
704737
let waiters = &mut futex.waiters;
705738
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
706739
waiters.push_back(FutexWaiter { thread, bitset });
740+
drop(futex);
741+
707742
this.block_thread(
708-
BlockReason::Futex { addr },
743+
BlockReason::Futex,
709744
timeout,
710745
callback!(
711746
@capture<'tcx> {
712-
addr: u64,
747+
futex_ref: FutexRef,
713748
retval_succ: Scalar,
714749
retval_timeout: Scalar,
715750
dest: MPlaceTy<'tcx>,
716751
errno_timeout: IoError,
717752
}
718753
@unblock = |this| {
719-
let futex = this.machine.sync.futexes.get(&addr).unwrap();
754+
let futex = futex_ref.0.borrow();
720755
// Acquire the clock of the futex.
721756
if let Some(data_race) = &this.machine.data_race {
722757
data_race.acquire_clock(&futex.clock, &this.machine.threads);
@@ -728,7 +763,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
728763
@timeout = |this| {
729764
// Remove the waiter from the futex.
730765
let thread = this.active_thread();
731-
let futex = this.machine.sync.futexes.get_mut(&addr).unwrap();
766+
let mut futex = futex_ref.0.borrow_mut();
732767
futex.waiters.retain(|waiter| waiter.thread != thread);
733768
// Set errno and write return value.
734769
this.set_last_error(errno_timeout)?;
@@ -739,12 +774,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
739774
);
740775
}
741776

777+
/// Wake up the first thread in the queue that matches any of the bits in the bitset.
742778
/// Returns whether anything was woken.
743-
fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
779+
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
744780
let this = self.eval_context_mut();
745-
let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
746-
return interp_ok(false);
747-
};
781+
let mut futex = futex_ref.0.borrow_mut();
748782
let data_race = &this.machine.data_race;
749783

750784
// Each futex-wake happens-before the end of the futex wait
@@ -757,7 +791,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
757791
return interp_ok(false);
758792
};
759793
let waiter = futex.waiters.remove(i).unwrap();
760-
this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
794+
drop(futex);
795+
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
761796
interp_ok(true)
762797
}
763798
}

src/concurrency/thread.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub enum BlockReason {
147147
/// Blocked on a reader-writer lock.
148148
RwLock(RwLockId),
149149
/// Blocked on a Futex variable.
150-
Futex { addr: u64 },
150+
Futex,
151151
/// Blocked on an InitOnce.
152152
InitOnce(InitOnceId),
153153
/// Blocked on epoll.

src/shims/unix/linux/sync.rs

+26-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
use crate::concurrency::sync::FutexRef;
12
use crate::helpers::check_min_arg_count;
23
use crate::*;
34

5+
struct LinuxFutex {
6+
futex: FutexRef,
7+
}
8+
49
/// Implementation of the SYS_futex syscall.
510
/// `args` is the arguments *including* the syscall number.
611
pub fn futex<'tcx>(
@@ -27,7 +32,6 @@ pub fn futex<'tcx>(
2732

2833
// This is a vararg function so we have to bring our own type for this pointer.
2934
let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
30-
let addr_usize = addr.ptr().addr().bytes();
3135

3236
let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG");
3337
let futex_wait = this.eval_libc_i32("FUTEX_WAIT");
@@ -50,6 +54,15 @@ pub fn futex<'tcx>(
5054
op if op & !futex_realtime == futex_wait || op & !futex_realtime == futex_wait_bitset => {
5155
let wait_bitset = op & !futex_realtime == futex_wait_bitset;
5256

57+
// Storing this inside the allocation means that if an address gets reused by a new allocation,
58+
// we'll use an independent futex queue for this... that seems acceptable.
59+
let futex_ref = this
60+
.get_sync_or_init(addr.ptr(), |_| {
61+
interp_ok(LinuxFutex { futex: Default::default() })
62+
})?
63+
.futex
64+
.clone();
65+
5366
let (timeout, bitset) = if wait_bitset {
5467
let [_, _, _, _, timeout, uaddr2, bitset] =
5568
check_min_arg_count("`syscall(SYS_futex, FUTEX_WAIT_BITSET, ...)`", args)?;
@@ -63,8 +76,7 @@ pub fn futex<'tcx>(
6376
};
6477

6578
if bitset == 0 {
66-
this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
67-
return interp_ok(());
79+
return this.set_last_error_and_return(LibcError("EINVAL"), dest);
6880
}
6981

7082
let timeout = this.deref_pointer_as(timeout, this.libc_ty_layout("timespec"))?;
@@ -144,7 +156,7 @@ pub fn futex<'tcx>(
144156
if val == futex_val {
145157
// The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
146158
this.futex_wait(
147-
addr_usize,
159+
futex_ref,
148160
bitset,
149161
timeout,
150162
Scalar::from_target_isize(0, this), // retval_succ
@@ -165,6 +177,15 @@ pub fn futex<'tcx>(
165177
// FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset)
166178
// Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up.
167179
op if op == futex_wake || op == futex_wake_bitset => {
180+
let Some(futex_ref) = this.try_get_sync_or_init(addr.ptr(), |_| {
181+
interp_ok(LinuxFutex { futex: Default::default() })
182+
}) else {
183+
// No AllocId, or no live allocation at that AllocId.
184+
// Return an error code. (That seems nicer than silently doing something non-intuitive.)
185+
return this.set_last_error_and_return(LibcError("EFAULT"), dest);
186+
};
187+
let futex_ref = futex_ref.futex.clone();
188+
168189
let bitset = if op == futex_wake_bitset {
169190
let [_, _, _, _, timeout, uaddr2, bitset] =
170191
check_min_arg_count("`syscall(SYS_futex, FUTEX_WAKE_BITSET, ...)`", args)?;
@@ -184,7 +205,7 @@ pub fn futex<'tcx>(
184205
let mut n = 0;
185206
#[expect(clippy::arithmetic_side_effects)]
186207
for _ in 0..val {
187-
if this.futex_wake(addr_usize, bitset)? {
208+
if this.futex_wake(&futex_ref, bitset)? {
188209
n += 1;
189210
} else {
190211
break;

src/shims/windows/sync.rs

+26-6
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ use std::time::Duration;
33
use rustc_abi::Size;
44

55
use crate::concurrency::init_once::InitOnceStatus;
6+
use crate::concurrency::sync::FutexRef;
67
use crate::*;
78

89
#[derive(Copy, Clone)]
910
struct WindowsInitOnce {
1011
id: InitOnceId,
1112
}
1213

14+
struct WindowsFutex {
15+
futex: FutexRef,
16+
}
17+
1318
impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {}
1419
trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
1520
// Windows sync primitives are pointer sized.
@@ -168,7 +173,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
168173
let size = this.read_target_usize(size_op)?;
169174
let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?;
170175

171-
let addr = ptr.addr().bytes();
176+
let futex_ref = this
177+
.get_sync_or_init(ptr, |_| interp_ok(WindowsFutex { futex: Default::default() }))?
178+
.futex
179+
.clone();
172180

173181
if size > 8 || !size.is_power_of_two() {
174182
let invalid_param = this.eval_windows("c", "ERROR_INVALID_PARAMETER");
@@ -196,7 +204,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
196204
if futex_val == compare_val {
197205
// If the values are the same, we have to block.
198206
this.futex_wait(
199-
addr,
207+
futex_ref,
200208
u32::MAX, // bitset
201209
timeout,
202210
Scalar::from_i32(1), // retval_succ
@@ -219,8 +227,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
219227
// See the Linux futex implementation for why this fence exists.
220228
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
221229

222-
let addr = ptr.addr().bytes();
223-
this.futex_wake(addr, u32::MAX)?;
230+
let Some(futex_ref) = this
231+
.try_get_sync_or_init(ptr, |_| interp_ok(WindowsFutex { futex: Default::default() }))
232+
else {
233+
// Seems like this cannot return an error, so we just wake nobody.
234+
return interp_ok(());
235+
};
236+
let futex_ref = futex_ref.futex.clone();
237+
this.futex_wake(&futex_ref, u32::MAX)?;
224238

225239
interp_ok(())
226240
}
@@ -232,8 +246,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
232246
// See the Linux futex implementation for why this fence exists.
233247
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
234248

235-
let addr = ptr.addr().bytes();
236-
while this.futex_wake(addr, u32::MAX)? {}
249+
let Some(futex_ref) = this
250+
.try_get_sync_or_init(ptr, |_| interp_ok(WindowsFutex { futex: Default::default() }))
251+
else {
252+
// Seems like this cannot return an error, so we just wake nobody.
253+
return interp_ok(());
254+
};
255+
let futex_ref = futex_ref.futex.clone();
256+
while this.futex_wake(&futex_ref, u32::MAX)? {}
237257

238258
interp_ok(())
239259
}

tests/pass-dep/concurrency/linux-futex.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ fn wake_dangling() {
4141
let ptr: *const i32 = &*futex;
4242
drop(futex);
4343

44-
// Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting.
44+
// Expect error since this is now "unmapped" memory.
4545
unsafe {
46-
assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), 0);
46+
assert_eq!(libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, 1), -1);
47+
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EFAULT);
4748
}
4849
}
4950

0 commit comments

Comments
 (0)