Skip to content

Commit d5e126e

Browse files
committed
store futexes in per-allocation data rather than globally
1 parent 9cb4f0e commit d5e126e

File tree

7 files changed

+162
-116
lines changed

7 files changed

+162
-116
lines changed

src/concurrency/sync.rs

Lines changed: 116 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::cell::RefCell;
12
use std::collections::VecDeque;
23
use std::collections::hash_map::Entry;
34
use std::ops::Not;
5+
use std::rc::Rc;
46
use std::time::Duration;
57

68
use rustc_data_structures::fx::FxHashMap;
@@ -121,6 +123,15 @@ struct Futex {
121123
clock: VClock,
122124
}
123125

126+
#[derive(Default, Clone)]
127+
pub struct FutexRef(Rc<RefCell<Futex>>);
128+
129+
impl VisitProvenance for FutexRef {
130+
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
131+
// No provenance
132+
}
133+
}
134+
124135
/// A thread waiting on a futex.
125136
#[derive(Debug)]
126137
struct FutexWaiter {
@@ -137,9 +148,6 @@ pub struct SynchronizationObjects {
137148
rwlocks: IndexVec<RwLockId, RwLock>,
138149
condvars: IndexVec<CondvarId, Condvar>,
139150
pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
140-
141-
/// Futex info for the futex at the given address.
142-
futexes: FxHashMap<u64, Futex>,
143151
}
144152

145153
// Private extension trait for local helper methods
@@ -184,7 +192,7 @@ impl SynchronizationObjects {
184192
}
185193

186194
impl<'tcx> AllocExtra<'tcx> {
187-
pub fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
195+
fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
188196
self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
189197
}
190198
}
@@ -193,75 +201,104 @@ impl<'tcx> AllocExtra<'tcx> {
193201
/// If `init` is set to this, we consider the primitive initialized.
194202
pub const LAZY_INIT_COOKIE: u32 = 0xcafe_affe;
195203

196-
/// Helper for lazily initialized `alloc_extra.sync` data:
197-
/// this forces an immediate init.
198-
pub fn lazy_sync_init<'tcx, T: 'static + Copy>(
199-
ecx: &mut MiriInterpCx<'tcx>,
200-
primitive: &MPlaceTy<'tcx>,
201-
init_offset: Size,
202-
data: T,
203-
) -> InterpResult<'tcx> {
204-
let (alloc, offset, _) = ecx.ptr_get_alloc_id(primitive.ptr(), 0)?;
205-
let (alloc_extra, _machine) = ecx.get_alloc_extra_mut(alloc)?;
206-
alloc_extra.sync.insert(offset, Box::new(data));
207-
// Mark this as "initialized".
208-
let init_field = primitive.offset(init_offset, ecx.machine.layouts.u32, ecx)?;
209-
ecx.write_scalar_atomic(
210-
Scalar::from_u32(LAZY_INIT_COOKIE),
211-
&init_field,
212-
AtomicWriteOrd::Relaxed,
213-
)?;
214-
interp_ok(())
215-
}
216-
217-
/// Helper for lazily initialized `alloc_extra.sync` data:
218-
/// Checks if the primitive is initialized, and return its associated data if so.
219-
/// Otherwise, calls `new_data` to initialize the primitive.
220-
pub fn lazy_sync_get_data<'tcx, T: 'static + Copy>(
221-
ecx: &mut MiriInterpCx<'tcx>,
222-
primitive: &MPlaceTy<'tcx>,
223-
init_offset: Size,
224-
name: &str,
225-
new_data: impl FnOnce(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, T>,
226-
) -> InterpResult<'tcx, T> {
227-
// Check if this is already initialized. Needs to be atomic because we can race with another
228-
// thread initializing. Needs to be an RMW operation to ensure we read the *latest* value.
229-
// So we just try to replace MUTEX_INIT_COOKIE with itself.
230-
let init_cookie = Scalar::from_u32(LAZY_INIT_COOKIE);
231-
let init_field = primitive.offset(init_offset, ecx.machine.layouts.u32, ecx)?;
232-
let (_init, success) = ecx
233-
.atomic_compare_exchange_scalar(
234-
&init_field,
235-
&ImmTy::from_scalar(init_cookie, ecx.machine.layouts.u32),
236-
init_cookie,
237-
AtomicRwOrd::Relaxed,
238-
AtomicReadOrd::Relaxed,
239-
/* can_fail_spuriously */ false,
240-
)?
241-
.to_scalar_pair();
242-
243-
if success.to_bool()? {
244-
// If it is initialized, it must be found in the "sync primitive" table,
245-
// or else it has been moved illegally.
246-
let (alloc, offset, _) = ecx.ptr_get_alloc_id(primitive.ptr(), 0)?;
247-
let alloc_extra = ecx.get_alloc_extra(alloc)?;
248-
let data = alloc_extra
249-
.get_sync::<T>(offset)
250-
.ok_or_else(|| err_ub_format!("`{name}` can't be moved after first use"))?;
251-
interp_ok(*data)
252-
} else {
253-
let data = new_data(ecx)?;
254-
lazy_sync_init(ecx, primitive, init_offset, data)?;
255-
interp_ok(data)
256-
}
257-
}
258-
259204
// Public interface to synchronization primitives. Please note that in most
260205
// cases, the function calls are infallible and it is the client's (shim
261206
// implementation's) responsibility to detect and deal with erroneous
262207
// situations.
263208
impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
264209
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
210+
/// Helper for lazily initialized `alloc_extra.sync` data:
211+
/// this forces an immediate init.
212+
fn lazy_sync_init<T: 'static + Copy>(
213+
&mut self,
214+
primitive: &MPlaceTy<'tcx>,
215+
init_offset: Size,
216+
data: T,
217+
) -> InterpResult<'tcx> {
218+
let this = self.eval_context_mut();
219+
220+
let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
221+
let (alloc_extra, _machine) = this.get_alloc_extra_mut(alloc)?;
222+
alloc_extra.sync.insert(offset, Box::new(data));
223+
// Mark this as "initialized".
224+
let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
225+
this.write_scalar_atomic(
226+
Scalar::from_u32(LAZY_INIT_COOKIE),
227+
&init_field,
228+
AtomicWriteOrd::Relaxed,
229+
)?;
230+
interp_ok(())
231+
}
232+
233+
/// Helper for lazily initialized `alloc_extra.sync` data:
234+
/// Checks if the primitive is initialized, and return its associated data if so.
235+
/// Otherwise, calls `new_data` to initialize the primitive.
236+
fn lazy_sync_get_data<T: 'static + Copy>(
237+
&mut self,
238+
primitive: &MPlaceTy<'tcx>,
239+
init_offset: Size,
240+
name: &str,
241+
new_data: impl FnOnce(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, T>,
242+
) -> InterpResult<'tcx, T> {
243+
let this = self.eval_context_mut();
244+
245+
// Check if this is already initialized. Needs to be atomic because we can race with another
246+
// thread initializing. Needs to be an RMW operation to ensure we read the *latest* value.
247+
// So we just try to replace MUTEX_INIT_COOKIE with itself.
248+
let init_cookie = Scalar::from_u32(LAZY_INIT_COOKIE);
249+
let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
250+
let (_init, success) = this
251+
.atomic_compare_exchange_scalar(
252+
&init_field,
253+
&ImmTy::from_scalar(init_cookie, this.machine.layouts.u32),
254+
init_cookie,
255+
AtomicRwOrd::Relaxed,
256+
AtomicReadOrd::Relaxed,
257+
/* can_fail_spuriously */ false,
258+
)?
259+
.to_scalar_pair();
260+
261+
if success.to_bool()? {
262+
// If it is initialized, it must be found in the "sync primitive" table,
263+
// or else it has been moved illegally.
264+
let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
265+
let alloc_extra = this.get_alloc_extra(alloc)?;
266+
let data = alloc_extra
267+
.get_sync::<T>(offset)
268+
.ok_or_else(|| err_ub_format!("`{name}` can't be moved after first use"))?;
269+
interp_ok(*data)
270+
} else {
271+
let data = new_data(this)?;
272+
this.lazy_sync_init(primitive, init_offset, data)?;
273+
interp_ok(data)
274+
}
275+
}
276+
277+
/// Get the synchronization primitive associated with the given pointer,
278+
/// or initialize a new one.
279+
fn get_sync_or_init<'a, T: 'static>(
280+
&'a mut self,
281+
ptr: Pointer,
282+
new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> InterpResult<'tcx, T>,
283+
) -> InterpResult<'tcx, &'a T>
284+
where
285+
'tcx: 'a,
286+
{
287+
let this = self.eval_context_mut();
288+
// Ensure there is memory behind this pointer, so that this allocation
289+
// is truly the only place where the data could be stored.
290+
this.check_ptr_access(ptr, Size::from_bytes(1), CheckInAllocMsg::InboundsTest)?;
291+
292+
let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0)?;
293+
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
294+
// Due to borrow checker reasons, we have to do the lookup twice.
295+
if alloc_extra.get_sync::<T>(offset).is_none() {
296+
let new = new(machine)?;
297+
alloc_extra.sync.insert(offset, Box::new(new));
298+
}
299+
interp_ok(alloc_extra.get_sync::<T>(offset).unwrap())
300+
}
301+
265302
#[inline]
266303
/// Get the id of the thread that currently owns this lock.
267304
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
@@ -656,7 +693,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
656693
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
657694
fn futex_wait(
658695
&mut self,
659-
addr: u64,
696+
futex_ref: FutexRef,
660697
bitset: u32,
661698
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
662699
retval_succ: Scalar,
@@ -666,23 +703,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
666703
) {
667704
let this = self.eval_context_mut();
668705
let thread = this.active_thread();
669-
let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
706+
let mut futex = futex_ref.0.borrow_mut();
670707
let waiters = &mut futex.waiters;
671708
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
672709
waiters.push_back(FutexWaiter { thread, bitset });
710+
drop(futex);
711+
673712
this.block_thread(
674-
BlockReason::Futex { addr },
713+
BlockReason::Futex,
675714
timeout,
676715
callback!(
677716
@capture<'tcx> {
678-
addr: u64,
717+
futex_ref: FutexRef,
679718
retval_succ: Scalar,
680719
retval_timeout: Scalar,
681720
dest: MPlaceTy<'tcx>,
682721
errno_timeout: Scalar,
683722
}
684723
@unblock = |this| {
685-
let futex = this.machine.sync.futexes.get(&addr).unwrap();
724+
let futex = futex_ref.0.borrow();
686725
// Acquire the clock of the futex.
687726
if let Some(data_race) = &this.machine.data_race {
688727
data_race.acquire_clock(&futex.clock, &this.machine.threads);
@@ -694,7 +733,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
694733
@timeout = |this| {
695734
// Remove the waiter from the futex.
696735
let thread = this.active_thread();
697-
let futex = this.machine.sync.futexes.get_mut(&addr).unwrap();
736+
let mut futex = futex_ref.0.borrow_mut();
698737
futex.waiters.retain(|waiter| waiter.thread != thread);
699738
// Set errno and write return value.
700739
this.set_last_error(errno_timeout)?;
@@ -706,11 +745,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
706745
}
707746

708747
/// Returns whether anything was woken.
709-
fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
748+
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
710749
let this = self.eval_context_mut();
711-
let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
712-
return interp_ok(false);
713-
};
750+
let mut futex = futex_ref.0.borrow_mut();
714751
let data_race = &this.machine.data_race;
715752

716753
// Each futex-wake happens-before the end of the futex wait
@@ -723,7 +760,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
723760
return interp_ok(false);
724761
};
725762
let waiter = futex.waiters.remove(i).unwrap();
726-
this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
763+
drop(futex);
764+
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
727765
interp_ok(true)
728766
}
729767
}

src/concurrency/thread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pub enum BlockReason {
168168
/// Blocked on a reader-writer lock.
169169
RwLock(RwLockId),
170170
/// Blocked on a Futex variable.
171-
Futex { addr: u64 },
171+
Futex,
172172
/// Blocked on an InitOnce.
173173
InitOnce(InitOnceId),
174174
/// Blocked on epoll.

src/shims/unix/linux/sync.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
use crate::concurrency::sync::FutexRef;
12
use crate::*;
23

4+
struct LinuxFutex {
5+
futex: FutexRef,
6+
}
7+
38
/// Implementation of the SYS_futex syscall.
49
/// `args` is the arguments *after* the syscall number.
510
pub fn futex<'tcx>(
@@ -29,9 +34,15 @@ pub fn futex<'tcx>(
2934
let op = this.read_scalar(op)?.to_i32()?;
3035
let val = this.read_scalar(val)?.to_i32()?;
3136

37+
// Storing this inside the allocation means that if an address gets reused by a new allocation,
38+
// we'll use an independent futex queue for this... that seems acceptable.
39+
let futex_ref = this
40+
.get_sync_or_init(addr, |_| interp_ok(LinuxFutex { futex: Default::default() }))?
41+
.futex
42+
.clone();
43+
3244
// This is a vararg function so we have to bring our own type for this pointer.
3345
let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
34-
let addr_usize = addr.ptr().addr().bytes();
3546

3647
let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG");
3748
let futex_wait = this.eval_libc_i32("FUTEX_WAIT");
@@ -159,7 +170,7 @@ pub fn futex<'tcx>(
159170
if val == futex_val {
160171
// The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
161172
this.futex_wait(
162-
addr_usize,
173+
futex_ref,
163174
bitset,
164175
timeout,
165176
Scalar::from_target_isize(0, this), // retval_succ
@@ -207,7 +218,7 @@ pub fn futex<'tcx>(
207218
let mut n = 0;
208219
#[allow(clippy::arithmetic_side_effects)]
209220
for _ in 0..val {
210-
if this.futex_wake(addr_usize, bitset)? {
221+
if this.futex_wake(&futex_ref, bitset)? {
211222
n += 1;
212223
} else {
213224
break;

src/shims/unix/macos/sync.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,11 @@ trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
2323
let lock = this.deref_pointer(lock_ptr)?;
2424
// We store the mutex ID in the `sync` metadata. This means that when the lock is moved,
2525
// that's just implicitly creating a new lock at the new location.
26-
let (alloc, offset, _) = this.ptr_get_alloc_id(lock.ptr(), 0)?;
27-
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
28-
if let Some(data) = alloc_extra.get_sync::<MacOsUnfairLock>(offset) {
29-
interp_ok(data.id)
30-
} else {
26+
let data = this.get_sync_or_init(lock.ptr(), |machine| {
3127
let id = machine.sync.mutex_create();
32-
alloc_extra.sync.insert(offset, Box::new(MacOsUnfairLock { id }));
33-
interp_ok(id)
34-
}
28+
interp_ok(MacOsUnfairLock { id })
29+
})?;
30+
interp_ok(data.id)
3531
}
3632
}
3733

0 commit comments

Comments
 (0)