diff --git a/src/bin/miri.rs b/src/bin/miri.rs index 1117b69116..3aa33bf1de 100644 --- a/src/bin/miri.rs +++ b/src/bin/miri.rs @@ -282,6 +282,16 @@ fn main() { }; miri_config.tracked_alloc_id = Some(miri::AllocId(id)); } + arg if arg.starts_with("-Zmiri-max-yield-iterations=") => { + let count: u32 = match arg.strip_prefix("-Zmiri-max-yield-iterations=").unwrap().parse() { + Ok(id) => id, + Err(err) => panic!( + "-Zmiri-max-yield-iterations= requires a valid `u32` argument: {}", + err + ), + }; + miri_config.max_yield_count = count; + } _ => { // Forward to rustc. rustc_args.push(arg); diff --git a/src/data_race.rs b/src/data_race.rs index 44cce53957..48a918ff47 100644 --- a/src/data_race.rs +++ b/src/data_race.rs @@ -76,7 +76,7 @@ use rustc_target::abi::Size; use crate::{ ImmTy, Immediate, InterpResult, MPlaceTy, MemPlaceMeta, MiriEvalContext, MiriEvalContextExt, OpTy, Pointer, RangeMap, ScalarMaybeUninit, Tag, ThreadId, VClock, VTimestamp, - VectorIdx, MemoryKind, MiriMemoryKind + VectorIdx, MemoryKind, MiriMemoryKind, ThreadsEvalContextExt }; pub type AllocExtra = VClockAlloc; @@ -445,13 +445,13 @@ impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for MiriEvalContext<'mir, 'tcx pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { /// Atomic variant of read_scalar_at_offset. fn read_scalar_at_offset_atomic( - &self, + &mut self, op: OpTy<'tcx, Tag>, offset: u64, layout: TyAndLayout<'tcx>, atomic: AtomicReadOp, ) -> InterpResult<'tcx, ScalarMaybeUninit> { - let this = self.eval_context_ref(); + let this = self.eval_context_mut(); let op_place = this.deref_operand(op)?; let offset = Size::from_bytes(offset); @@ -482,7 +482,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { /// Perform an atomic read operation at the memory location. fn read_scalar_atomic( - &self, + &mut self, place: MPlaceTy<'tcx, Tag>, atomic: AtomicReadOp, ) -> InterpResult<'tcx, ScalarMaybeUninit> { @@ -582,15 +582,16 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { /// Update the data-race detector for an atomic read occurring at the /// associated memory-place and on the current thread. fn validate_atomic_load( - &self, + &mut self, place: MPlaceTy<'tcx, Tag>, atomic: AtomicReadOp, ) -> InterpResult<'tcx> { - let this = self.eval_context_ref(); + let this = self.eval_context_mut(); this.validate_atomic_op( place, atomic, "Atomic Load", + false, move |memory, clocks, index, atomic| { if atomic == AtomicReadOp::Relaxed { memory.load_relaxed(&mut *clocks, index) @@ -608,11 +609,12 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { place: MPlaceTy<'tcx, Tag>, atomic: AtomicWriteOp, ) -> InterpResult<'tcx> { - let this = self.eval_context_ref(); + let this = self.eval_context_mut(); this.validate_atomic_op( place, atomic, "Atomic Store", + true, move |memory, clocks, index, atomic| { if atomic == AtomicWriteOp::Relaxed { memory.store_relaxed(clocks, index) @@ -633,8 +635,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { use AtomicRwOp::*; let acquire = matches!(atomic, Acquire | AcqRel | SeqCst); let release = matches!(atomic, Release | AcqRel | SeqCst); - let this = self.eval_context_ref(); - this.validate_atomic_op(place, atomic, "Atomic RMW", move |memory, clocks, index, _| { + let this = self.eval_context_mut(); + this.validate_atomic_op( + place, + atomic, + "Atomic RMW", + // For yields the atomic write overrides all effects of the atomic read + // so it is treated as an atomic write. + true, + move |memory, clocks, index, _| { if acquire { memory.load_acquire(clocks, index)?; } else { @@ -961,10 +970,11 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { /// FIXME: is this valid, or should get_raw_mut be used for /// atomic-stores/atomic-rmw? fn validate_atomic_op( - &self, + &mut self, place: MPlaceTy<'tcx, Tag>, atomic: A, description: &str, + write: bool, mut op: impl FnMut( &mut MemoryCellClocks, &mut ThreadClockSet, @@ -972,7 +982,18 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriEvalContextExt<'mir, 'tcx> { A, ) -> Result<(), DataRace>, ) -> InterpResult<'tcx> { - let this = self.eval_context_ref(); + let this = self.eval_context_mut(); + + // Update yield metadata for yield based forward progress and live-lock detection. + let place_ptr = place.ptr.assert_ptr(); + let size = place.layout.size; + if write { + this.thread_yield_atomic_wake(place_ptr.alloc_id, place_ptr.offset,size); + } else { + let alloc_size = this.memory.get_raw(place_ptr.alloc_id)?.size; + this.thread_yield_atomic_watch(place_ptr.alloc_id, alloc_size, place_ptr.offset,size); + } + if let Some(data_race) = &this.memory.extra.data_race { if data_race.multi_threaded.get() { // Load and log the atomic operation. diff --git a/src/diagnostics.rs b/src/diagnostics.rs index 3f1f67218f..baefd25df2 100644 --- a/src/diagnostics.rs +++ b/src/diagnostics.rs @@ -16,6 +16,7 @@ pub enum TerminationInfo { UnsupportedInIsolation(String), ExperimentalUb { msg: String, url: String }, Deadlock, + Livelock, } impl fmt::Display for TerminationInfo { @@ -32,6 +33,8 @@ impl fmt::Display for TerminationInfo { write!(f, "{}", msg), Deadlock => write!(f, "the evaluated program deadlocked"), + Livelock => + write!(f, "the evaluated program livelocked"), } } } @@ -67,6 +70,7 @@ pub fn report_error<'tcx, 'mir>( ExperimentalUb { .. } => "Undefined Behavior", Deadlock => "deadlock", + Livelock => "livelock", }; let helps = match info { UnsupportedInIsolation(_) => @@ -76,6 +80,12 @@ pub fn report_error<'tcx, 'mir>( format!("this indicates a potential bug in the program: it performed an invalid operation, but the rules it violated are still experimental"), format!("see {} for further information", url), ], + Livelock => vec![ + format!("All thread yields or spin loop hints are used to dynamically generated watch sets to detect livelock and avoid unnecessary spins in spin-loops."), + format!("If this should not have reported a livelock then it may help to change the maximum number of spurious wakes from yields with no progress."), + format!("Pass the argument `-Zmiri-max-yield-iterations=` to change this value."), + format!("The current value is {}, a value of 0 will allow an unlimited number of spurious wakes.", ecx.thread_yield_get_max_spurious_wake()) + ], _ => vec![], }; (title, helps) diff --git a/src/eval.rs b/src/eval.rs index 0a62f14dd3..2cc6629d66 100644 --- a/src/eval.rs +++ b/src/eval.rs @@ -50,6 +50,9 @@ pub struct MiriConfig { pub track_raw: bool, /// Determine if data race detection should be enabled pub data_race_detector: bool, + /// Maximum number of consecutive yield operations with no progress + /// to consider a thread live-locked. A value of zero never live-locks. + pub max_yield_count: u32, } impl Default for MiriConfig { @@ -68,6 +71,7 @@ impl Default for MiriConfig { tracked_alloc_id: None, track_raw: false, data_race_detector: true, + max_yield_count: 1 } } } @@ -87,7 +91,7 @@ pub fn create_ecx<'mir, 'tcx: 'mir>( tcx, rustc_span::source_map::DUMMY_SP, param_env, - Evaluator::new(config.communicate, config.validate, layout_cx), + Evaluator::new(config.communicate, config.validate, config.max_yield_count, layout_cx), MemoryExtra::new(&config), ); // Complete initialization. diff --git a/src/lib.rs b/src/lib.rs index 581da0976e..a9e29c6d86 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ #![feature(never_type)] #![feature(or_patterns)] #![feature(try_blocks)] +#![feature(hash_drain_filter)] #![warn(rust_2018_idioms)] #![allow(clippy::cast_lossless)] diff --git a/src/machine.rs b/src/machine.rs index e639bf450a..d53730d4cb 100644 --- a/src/machine.rs +++ b/src/machine.rs @@ -275,6 +275,7 @@ impl<'mir, 'tcx> Evaluator<'mir, 'tcx> { pub(crate) fn new( communicate: bool, validate: bool, + max_yield_count: u32, layout_cx: LayoutCx<'tcx, TyCtxt<'tcx>>, ) -> Self { let layouts = PrimitiveLayouts::new(layout_cx) @@ -293,7 +294,7 @@ impl<'mir, 'tcx> Evaluator<'mir, 'tcx> { dir_handler: Default::default(), time_anchor: Instant::now(), layouts, - threads: ThreadManager::default(), + threads: ThreadManager::new(max_yield_count), static_roots: Vec::new(), } } diff --git a/src/shims/foreign_items.rs b/src/shims/foreign_items.rs index d588e2962a..cddfae8d12 100644 --- a/src/shims/foreign_items.rs +++ b/src/shims/foreign_items.rs @@ -222,6 +222,12 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx "miri_resolve_frame" => { this.handle_miri_resolve_frame(args, dest)?; } + // Performs a thread yield. + // Exists since a thread yield operation may not be available on a given platform. + "miri_yield_thread" => { + let &[] = check_arg_count(args)?; + this.yield_active_thread(); + } // Standard C allocation diff --git a/src/shims/posix/sync.rs b/src/shims/posix/sync.rs index 868c72289a..a5bb7258d7 100644 --- a/src/shims/posix/sync.rs +++ b/src/shims/posix/sync.rs @@ -81,7 +81,7 @@ fn mutex_set_kind<'mir, 'tcx: 'mir>( } fn mutex_get_id<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, + ecx: &mut MiriEvalContext<'mir, 'tcx>, mutex_op: OpTy<'tcx, Tag>, ) -> InterpResult<'tcx, ScalarMaybeUninit> { ecx.read_scalar_at_offset_atomic( @@ -125,7 +125,7 @@ fn mutex_get_or_create_id<'mir, 'tcx: 'mir>( // bytes 4-7: rwlock id as u32 or 0 if id is not assigned yet. fn rwlock_get_id<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, + ecx: &mut MiriEvalContext<'mir, 'tcx>, rwlock_op: OpTy<'tcx, Tag>, ) -> InterpResult<'tcx, ScalarMaybeUninit> { ecx.read_scalar_at_offset_atomic( @@ -192,7 +192,7 @@ fn condattr_set_clock_id<'mir, 'tcx: 'mir>( // bytes 8-11: the clock id constant as i32 fn cond_get_id<'mir, 'tcx: 'mir>( - ecx: &MiriEvalContext<'mir, 'tcx>, + ecx: &mut MiriEvalContext<'mir, 'tcx>, cond_op: OpTy<'tcx, Tag>, ) -> InterpResult<'tcx, ScalarMaybeUninit> { ecx.read_scalar_at_offset_atomic( diff --git a/src/sync.rs b/src/sync.rs index 4d488565fa..e19f583b50 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -217,20 +217,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx #[inline] /// Get the id of the thread that currently owns this lock. fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId { - let this = self.eval_context_ref(); + let this = self.eval_context_mut(); + this.thread_yield_mutex_watch(id); this.machine.threads.sync.mutexes[id].owner.unwrap() } #[inline] /// Check if locked. - fn mutex_is_locked(&self, id: MutexId) -> bool { - let this = self.eval_context_ref(); + fn mutex_is_locked(&mut self, id: MutexId) -> bool { + let this = self.eval_context_mut(); + this.thread_yield_mutex_watch(id); this.machine.threads.sync.mutexes[id].owner.is_some() } /// Lock by setting the mutex owner and increasing the lock count. fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) { let this = self.eval_context_mut(); + this.thread_yield_mutex_wake(id); let mutex = &mut this.machine.threads.sync.mutexes[id]; if let Some(current_owner) = mutex.owner { assert_eq!(thread, current_owner, "mutex already locked by another thread"); @@ -257,6 +260,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx expected_owner: ThreadId, ) -> Option { let this = self.eval_context_mut(); + this.thread_yield_mutex_wake(id); let mutex = &mut this.machine.threads.sync.mutexes[id]; if let Some(current_owner) = mutex.owner { // Mutex is locked. @@ -288,7 +292,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Put the thread into the queue waiting for the mutex. fn mutex_enqueue_and_block(&mut self, id: MutexId, thread: ThreadId) { let this = self.eval_context_mut(); - assert!(this.mutex_is_locked(id), "queing on unlocked mutex"); + assert!(this.mutex_is_locked(id), "queuing on unlocked mutex"); this.machine.threads.sync.mutexes[id].queue.push_back(thread); this.block_thread(thread); } @@ -302,8 +306,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx #[inline] /// Check if locked. - fn rwlock_is_locked(&self, id: RwLockId) -> bool { - let this = self.eval_context_ref(); + fn rwlock_is_locked(&mut self, id: RwLockId) -> bool { + let this = self.eval_context_mut(); + this.thread_yield_rwlock_watch(id); let rwlock = &this.machine.threads.sync.rwlocks[id]; trace!( "rwlock_is_locked: {:?} writer is {:?} and there are {} reader threads (some of which could hold multiple read locks)", @@ -314,8 +319,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx #[inline] /// Check if write locked. - fn rwlock_is_write_locked(&self, id: RwLockId) -> bool { - let this = self.eval_context_ref(); + fn rwlock_is_write_locked(&mut self, id: RwLockId) -> bool { + let this = self.eval_context_mut(); + this.thread_yield_rwlock_watch(id); let rwlock = &this.machine.threads.sync.rwlocks[id]; trace!("rwlock_is_write_locked: {:?} writer is {:?}", id, rwlock.writer); rwlock.writer.is_some() @@ -325,6 +331,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// this lock. fn rwlock_reader_lock(&mut self, id: RwLockId, reader: ThreadId) { let this = self.eval_context_mut(); + this.thread_yield_rwlock_wake(id); assert!(!this.rwlock_is_write_locked(id), "the lock is write locked"); trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, reader); let rwlock = &mut this.machine.threads.sync.rwlocks[id]; @@ -339,6 +346,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Returns `true` if succeeded, `false` if this `reader` did not hold the lock. fn rwlock_reader_unlock(&mut self, id: RwLockId, reader: ThreadId) -> bool { let this = self.eval_context_mut(); + this.thread_yield_rwlock_wake(id); let rwlock = &mut this.machine.threads.sync.rwlocks[id]; match rwlock.readers.entry(reader) { Entry::Occupied(mut entry) => { @@ -388,6 +396,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Lock by setting the writer that owns the lock. fn rwlock_writer_lock(&mut self, id: RwLockId, writer: ThreadId) { let this = self.eval_context_mut(); + this.thread_yield_rwlock_wake(id); assert!(!this.rwlock_is_locked(id), "the rwlock is already locked"); trace!("rwlock_writer_lock: {:?} now held by {:?}", id, writer); let rwlock = &mut this.machine.threads.sync.rwlocks[id]; @@ -401,6 +410,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Try to unlock by removing the writer. fn rwlock_writer_unlock(&mut self, id: RwLockId, expected_writer: ThreadId) -> bool { let this = self.eval_context_mut(); + this.thread_yield_rwlock_wake(id); let rwlock = &mut this.machine.threads.sync.rwlocks[id]; if let Some(current_writer) = rwlock.writer { if current_writer != expected_writer { @@ -459,12 +469,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Is the conditional variable awaited? fn condvar_is_awaited(&mut self, id: CondvarId) -> bool { let this = self.eval_context_mut(); + this.thread_yield_condvar_watch(id); !this.machine.threads.sync.condvars[id].waiters.is_empty() } /// Mark that the thread is waiting on the conditional variable. fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, mutex: MutexId) { let this = self.eval_context_mut(); + this.thread_yield_condvar_wake(id); let waiters = &mut this.machine.threads.sync.condvars[id].waiters; assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); waiters.push_back(CondvarWaiter { thread, mutex }); @@ -474,6 +486,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// variable. fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> { let this = self.eval_context_mut(); + this.thread_yield_condvar_wake(id); let current_thread = this.get_active_thread(); let condvar = &mut this.machine.threads.sync.condvars[id]; let data_race = &this.memory.extra.data_race; @@ -496,11 +509,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx /// Remove the thread from the queue of threads waiting on this conditional variable. fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) { let this = self.eval_context_mut(); + this.thread_yield_condvar_wake(id); this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread); } fn futex_wait(&mut self, addr: Pointer, thread: ThreadId) { let this = self.eval_context_mut(); + this.thread_yield_concurrent_progress(); let futex = &mut this.machine.threads.sync.futexes.entry(addr.erase_tag()).or_default(); let waiters = &mut futex.waiters; assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); @@ -509,6 +524,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx fn futex_wake(&mut self, addr: Pointer) -> Option { let this = self.eval_context_mut(); + this.thread_yield_concurrent_progress(); let current_thread = this.get_active_thread(); let futex = &mut this.machine.threads.sync.futexes.get_mut(&addr.erase_tag())?; let data_race = &this.memory.extra.data_race; diff --git a/src/thread.rs b/src/thread.rs index 5d78343041..35f8c3b135 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -9,11 +9,12 @@ use std::time::{Duration, Instant, SystemTime}; use log::trace; -use rustc_data_structures::fx::FxHashMap; +use rustc_data_structures::fx::{FxHashMap, FxHashSet}; use rustc_hir::def_id::DefId; use rustc_index::vec::{Idx, IndexVec}; +use rustc_target::abi::Size; -use crate::sync::SynchronizationState; +use crate::sync::{SynchronizationState, CondvarId}; use crate::*; #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -87,6 +88,15 @@ pub enum ThreadState { /// responsibility of the synchronization primitives to track threads that /// are blocked by them. BlockedOnSync, + /// The thread has yielded, but the full number of validation iterations + /// has not yet occurred. + /// Therefore this thread can be awoken if there are no enabled threads + /// available. + DelayOnYield, + /// The thread has fully yielded, signalling that it requires another thread + /// perform an action visible to it in order to make progress. + /// If all threads are in this state then live-lock is reported. + BlockedOnYield, /// The thread has terminated its execution. We do not delete terminated /// threads (FIXME: why?). Terminated, @@ -104,10 +114,182 @@ enum ThreadJoinStatus { Joined, } +/// Set of sync objects that can have properties queried. +/// The futex is not included since it can only signal +/// and awake values. +#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +enum SyncObject { + /// Can query lock state. + Mutex(MutexId), + /// Can query lock state. + RwLock(RwLockId), + /// Can query the if awaited. + Condvar(CondvarId), + +} + +enum YieldRecordState { + + /// The thread has made progress and so the recording of + /// states has terminated. + /// A thread is considered to have made progress performs some + /// action that is visible to another thread. + /// Examples include a release-store, mutex-unlock, + /// thread-termination and release-fence. + MadeProgress, + + /// The thread is currently recording the variable watch + /// state for the yield operation, this watch operation + /// can occur once of multiple times to validate the yield + /// is correct. + /// Multiple iterations can prevent reporting live-lock + /// in yield loops with finite iterations + Recording { + /// Map of global memory state to an record index. + /// If set to zero then this location is not watched, + /// a value of 1 means that it was watched on the first + /// iteration and a value of x>=2 means that the value + /// was watched on the 1,2,..,x iterations. + watch_atomic: FxHashMap>, + + /// Map of synchronization objects that can have properties + /// queried and are currently watched by this thread. + watch_sync: FxHashMap, + + /// The current iteration of the yield livelock loop + /// recording, should always be less than or equal + /// to the global live-lock loop counter. + record_iteration: u32, + } +} + +impl YieldRecordState { + + /// Progress has been made on the thread. + fn on_progress(&mut self) { + *self = YieldRecordState::MadeProgress; + } + + /// Mark an atomic variable as watched. + fn on_watch_atomic(&mut self, alloc_id: AllocId, alloc_size: Size, offset: Size, len: Size) { + if let YieldRecordState::Recording { + watch_atomic, record_iteration, .. + } = self { + let range_map = watch_atomic.entry(alloc_id) + .or_insert_with(|| RangeMap::new(alloc_size, 0)); + let mut assume_progress = false; + range_map.iter_mut(offset, len) + .for_each(|(_, watch)| { + if *watch < *record_iteration - 1 { + // Value stored does not match the last loop + // so assume some progress has been made. + assume_progress = true; + } + *watch = *record_iteration + }); + + // The watch set is different so assume progress has been made + if assume_progress { + log::trace!("Atomic watch with different value at {:?}, {:?}, {:?}", alloc_id, offset, len); + *self = YieldRecordState::MadeProgress; + } + } + } + + /// Mark a synchronization object as watched. + fn on_watch_sync(&mut self, sync: SyncObject) { + if let YieldRecordState::Recording { + watch_sync, record_iteration, .. + } = self { + let count = watch_sync.entry(sync).or_insert(0); + if *count < *record_iteration - 1 { + // Different content - assume progress. + log::trace!("Sync watch with different value at {:?}", sync); + *self = YieldRecordState::MadeProgress; + } else { + *count = *record_iteration; + } + } + } + + /// Returns true if the atomic variable is currently watched. + fn should_wake_atomic(&self, alloc_id: AllocId, offset: Size, len: Size) -> bool { + if let YieldRecordState::Recording { + watch_atomic, .. + } = self { + if let Some(range_map) = watch_atomic.get(&alloc_id) { + range_map.iter(offset, len).any(|(_, &watch)| watch != 0) + } else { + false + } + } else { + // First iteration yield, no wake metadata + // so only awaken after there are no enabled threads. + false + } + } + + /// Returns true if the sync object is currently watched. + fn should_wake_sync(&self, sync: SyncObject) -> bool { + if let YieldRecordState::Recording { + watch_sync, .. + } = self { + if let Some(count) = watch_sync.get(&sync) { + *count != 0 + } else { + false + } + } else { + // First iteration, no wake metadata. + false + } + } + + /// Returns the number of yield iterations that have been executed. + fn get_iteration_count(&self) -> u32 { + if let YieldRecordState::Recording { + record_iteration, .. + } = self { + *record_iteration + } else { + 0 + } + } + + fn should_watch(&self) -> bool { + if let YieldRecordState::Recording { + watch_atomic, watch_sync, .. + } = self { + // Should watch if either watch hash-set is non-empty + !watch_atomic.is_empty() || !watch_sync.is_empty() + } else { + false + } + } + + /// Starts the next yield iteration + fn start_iteration(&mut self) { + if let YieldRecordState::Recording { + record_iteration, .. + } = self { + *record_iteration += 1; + } else { + *self = YieldRecordState::Recording { + watch_atomic: FxHashMap::default(), + watch_sync: FxHashMap::default(), + record_iteration: 1 + } + } + } +} + /// A thread. pub struct Thread<'mir, 'tcx> { state: ThreadState, + /// Metadata for blocking on yield operations + yield_state: YieldRecordState, + /// Name of the thread. thread_name: Option>, @@ -147,6 +329,25 @@ impl<'mir, 'tcx> Thread<'mir, 'tcx> { b"" } } + + /// Start the thread yielding. Returns true if this thread has watch metadata. + fn on_yield(&mut self, max_yield: u32) -> bool { + let iteration_count = self.yield_state.get_iteration_count(); + let block = if max_yield == 0 { + // A value of 0 never blocks + false + } else { + iteration_count >= max_yield + }; + if block { + log::trace!("Thread entered Blocking yield"); + self.state = ThreadState::BlockedOnYield; + } else { + log::trace!("Thread entered standard yield with iteration {:?}", iteration_count); + self.state = ThreadState::DelayOnYield; + } + self.yield_state.should_watch() + } } impl<'mir, 'tcx> std::fmt::Debug for Thread<'mir, 'tcx> { @@ -159,6 +360,7 @@ impl<'mir, 'tcx> Default for Thread<'mir, 'tcx> { fn default() -> Self { Self { state: ThreadState::Enabled, + yield_state: YieldRecordState::MadeProgress, thread_name: None, stack: Vec::new(), join_status: ThreadJoinStatus::Joinable, @@ -212,6 +414,11 @@ pub struct ThreadManager<'mir, 'tcx> { /// /// Note that this vector also contains terminated threads. threads: IndexVec>, + /// Set of threads that are currently yielding. + yielding_thread_set: FxHashSet, + /// The maximum number of yields making no progress required + /// on all threads to report a live-lock. + max_yield_count: u32, /// This field is pub(crate) because the synchronization primitives /// (`crate::sync`) need a way to access it. pub(crate) sync: SynchronizationState, @@ -224,8 +431,8 @@ pub struct ThreadManager<'mir, 'tcx> { timeout_callbacks: FxHashMap>, } -impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { - fn default() -> Self { +impl<'mir, 'tcx> ThreadManager<'mir, 'tcx> { + pub fn new(max_yield_count: u32) -> Self { let mut threads = IndexVec::new(); // Create the main thread and add it to the list of threads. let mut main_thread = Thread::default(); @@ -235,6 +442,8 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { Self { active_thread: ThreadId::new(0), threads: threads, + yielding_thread_set: FxHashSet::default(), + max_yield_count, sync: SynchronizationState::default(), thread_local_alloc_ids: Default::default(), yield_active_thread: false, @@ -464,6 +673,68 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { return free_tls_statics; } + /// Called when the current thread performed + /// an atomic operation that potentially makes + /// progress + fn thread_yield_progress(&mut self) { + log::trace!("Thread {:?} has made progress via generic progress", self.active_thread); + self.threads[self.active_thread].yield_state.on_progress(); + } + + /// Called when the current thread performs + /// an atomic operation that may be visible to other threads. + fn thread_yield_atomic_wake(&mut self, alloc_id: AllocId, offset: Size, len: Size) { + let threads = &mut self.threads; + + // This thread performed an atomic update, mark as making progress. + log::trace!("Thread {:?} has made progress via atomic store", self.active_thread); + threads[self.active_thread].yield_state.on_progress(); + + // Awake all threads that were awaiting on changes to the modified atomic. + self.yielding_thread_set.drain_filter(move |&thread_id| { + let thread = &mut threads[thread_id]; + if thread.yield_state.should_wake_atomic(alloc_id, offset, len) { + thread.state = ThreadState::Enabled; + thread.yield_state.on_progress(); + true + } else { + false + } + }); + } + + /// Called when the current thread performs + /// an atomic read operation and may want + /// to mark that variable as watched to wake + /// the current yield. + fn thread_yield_atomic_watch(&mut self, alloc_id: AllocId, alloc_size: Size, offset: Size, len: Size) { + self.threads[self.active_thread].yield_state.on_watch_atomic(alloc_id, alloc_size, offset, len) + } + + fn thread_yield_sync_wake(&mut self, sync: SyncObject) { + let threads = &mut self.threads; + + // This thread performed an sync update, mark as making progress. + log::trace!("Thread {:?} has made progress via sync object update", self.active_thread); + threads[self.active_thread].yield_state.on_progress(); + + // Awake all threads that were awaiting on changes to the sync object. + self.yielding_thread_set.drain_filter(move |&thread_id| { + let thread = &mut threads[thread_id]; + if thread.yield_state.should_wake_sync(sync) { + thread.state = ThreadState::Enabled; + thread.yield_state.on_progress(); + true + } else { + false + } + }); + } + + fn thread_yield_sync_watch(&mut self, sync: SyncObject) { + self.threads[self.active_thread].yield_state.on_watch_sync(sync); + } + /// Decide which action to take next and on which thread. /// /// The currently implemented scheduling policy is the one that is commonly @@ -504,28 +775,52 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { return Ok(SchedulingAction::ExecuteTimeoutCallback); } // No callbacks scheduled, pick a regular thread to execute. - if self.threads[self.active_thread].state == ThreadState::Enabled - && !self.yield_active_thread - { - // The currently active thread is still enabled, just continue with it. - return Ok(SchedulingAction::ExecuteStep); + if self.threads[self.active_thread].state == ThreadState::Enabled { + if self.yield_active_thread { + // The currently active thread has yielded, update the state + log::trace!("Yielding thread {:?}", self.active_thread); + if self.threads[self.active_thread].on_yield(self.max_yield_count) { + // The thread has a non-zero set of wake metadata to exit the yield + // so insert into the set of threads that may wake. + self.yielding_thread_set.insert(self.active_thread); + } + } else { + // The currently active thread is still enabled, just continue with it. + return Ok(SchedulingAction::ExecuteStep); + } } // We need to pick a new thread for execution. - for (id, thread) in self.threads.iter_enumerated() { - if thread.state == ThreadState::Enabled { - if !self.yield_active_thread || id != self.active_thread { - self.active_thread = id; - if let Some(data_race) = data_race { - data_race.thread_set_active(self.active_thread); - } - break; + // First try to select a thread that has not yielded. + let new_thread = if let Some(new_thread) = self.threads.iter_enumerated() + .find(|(_, thread)| thread.state == ThreadState::Enabled) { + Some(new_thread.0) + } else { + // No active threads, wake all non blocking yields and try again. + let mut new_thread = None; + for (id,thread) in self.threads.iter_enumerated_mut() { + if thread.state == ThreadState::DelayOnYield { + + // Re-enable the thread and start the next yield iteration. + thread.state = ThreadState::Enabled; + thread.yield_state.start_iteration(); + self.yielding_thread_set.remove(&id); + new_thread = new_thread.or(Some(id)); } } - } + new_thread + }; self.yield_active_thread = false; - if self.threads[self.active_thread].state == ThreadState::Enabled { + + // We found a valid thread to execute. + if let Some(new_thread) = new_thread { + self.active_thread = new_thread; + if let Some(data_race) = data_race { + data_race.thread_set_active(self.active_thread); + } + assert!(self.threads[self.active_thread].state == ThreadState::Enabled); return Ok(SchedulingAction::ExecuteStep); } + // We have not found a thread to execute. if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { unreachable!("all threads terminated without the main thread terminating?!"); @@ -535,6 +830,10 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { // sleep until the first callback. std::thread::sleep(sleep_time); Ok(SchedulingAction::ExecuteTimeoutCallback) + } else if self.threads.iter().any(|thread| thread.state == ThreadState::BlockedOnYield) { + // At least one thread is blocked on a yield with max iterations. + // Report a livelock instead of a deadlock. + throw_machine_stop!(TerminationInfo::Livelock); } else { throw_machine_stop!(TerminationInfo::Deadlock); } @@ -675,6 +974,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx fn block_thread(&mut self, thread: ThreadId) { let this = self.eval_context_mut(); this.machine.threads.block_thread(thread); + + // This is waiting on some other concurrency object + // so for yield live-lock detection it has made progress. + this.machine.threads.thread_yield_progress(); } #[inline] @@ -746,4 +1049,80 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx } Ok(()) } + + /// Called to state that some concurrent operation has occurred + /// and to invalidate any current live-lock metadata. + #[inline] + fn thread_yield_concurrent_progress(&mut self) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_progress(); + } + + /// Mark internal mutex state as modified. + #[inline] + fn thread_yield_mutex_wake(&mut self, mutex: MutexId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_wake(SyncObject::Mutex(mutex)); + } + + /// Mark internal rw-lock state as modified. + #[inline] + fn thread_yield_rwlock_wake(&mut self, rwlock: RwLockId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_wake(SyncObject::RwLock(rwlock)); + } + + /// Mark internal cond-var state as modified. + #[inline] + fn thread_yield_condvar_wake(&mut self, condvar: CondvarId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_wake(SyncObject::Condvar(condvar)); + } + + /// Called when the current thread performs + /// an atomic operation that may be visible to other threads. + #[inline] + fn thread_yield_atomic_wake(&mut self, alloc_id: AllocId, offset: Size, len: Size) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_atomic_wake(alloc_id, offset, len); + } + + /// Awaken any threads that are yielding on an update to a mutex. + #[inline] + fn thread_yield_mutex_watch(&mut self, mutex: MutexId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_watch(SyncObject::Mutex(mutex)); + } + + /// Awaken any threads that are yielding on an update to a rwlock. + #[inline] + fn thread_yield_rwlock_watch(&mut self, rwlock: RwLockId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_watch(SyncObject::RwLock(rwlock)); + } + + /// Awaken any threads that are yielding on an update to a condvar. + #[inline] + fn thread_yield_condvar_watch(&mut self, condvar: CondvarId) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_sync_watch(SyncObject::Condvar(condvar)); + } + + /// Called when the current thread performs + /// an atomic read operation and may want + /// to mark that variable as watched to wake + /// the current yield. + #[inline] + fn thread_yield_atomic_watch(&mut self, alloc_id: AllocId, alloc_size: Size, offset: Size, len: Size) { + let this = self.eval_context_mut(); + this.machine.threads.thread_yield_atomic_watch(alloc_id, alloc_size, offset, len); + } + + /// Return the configuration for the maximum number of spurious wake events + /// from thread yields. Used for diagnostics. + #[inline] + fn thread_yield_get_max_spurious_wake(&self) -> u32 { + let this = self.eval_context_ref(); + this.machine.threads.max_yield_count + } } diff --git a/tests/compile-fail/live_lock/live_lock_condvar.rs b/tests/compile-fail/live_lock/live_lock_condvar.rs new file mode 100644 index 0000000000..71954a7790 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_condvar.rs @@ -0,0 +1,32 @@ +// compile-flags: -Zmiri-disable-isolation +// ignore-windows: Concurrency on Windows is not supported yet. + +// FIXME: the implicit mutex unlock & lock counts as forward progress with the current detector, +// so this runs forever. Ideally this case should be detected. + +// ignore-linux: currently is not detected. +// ignore-macos: currently is not detected. + +use std::sync::{Mutex, Condvar}; +use std::time::Duration; + +extern "Rust" { + fn miri_yield_thread(); +} + + +fn main() { + let mutex = Mutex::new(()); + let condvar = Condvar::new(); + + let mut lock = mutex.lock().unwrap(); + loop { + match s1.0.wait_timeout(lock, Duration::from_millis(100)) { + Ok(_) => break, + Err(err) => { + lock = err.into_inner().0; + unsafe { miri_yield_thread(); } //~ERROR livelock + } + } + } +} diff --git a/tests/compile-fail/live_lock/live_lock_contention.rs b/tests/compile-fail/live_lock/live_lock_contention.rs new file mode 100644 index 0000000000..1c22f3cee1 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_contention.rs @@ -0,0 +1,38 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} +fn spin_await(lock: &AtomicUsize, target: usize) { + while lock.load(Ordering::Acquire) != target { unsafe { miri_yield_thread(); } } //~ERROR livelock +} + + +fn main() { + let a = Arc::new(AtomicUsize::new(0)); + let b = a.clone(); + let c = a.clone(); + let d = a.clone(); + let j1 = spawn(move || { + spin_await(&a, 1) + }); + let j2 = spawn(move || { + spin_await(&b, 1) + }); + let j3 = spawn(move || { + spin_await(&c, 1) + }); + let j4 = spawn(move || { + d.store(2, Ordering::Release); + d.store(3, Ordering::Release); + d.store(4, Ordering::Release); + }); + j1.join().unwrap(); + j2.join().unwrap(); + j3.join().unwrap(); + j4.join().unwrap(); +} diff --git a/tests/compile-fail/live_lock/live_lock_deadlock.rs b/tests/compile-fail/live_lock/live_lock_deadlock.rs new file mode 100644 index 0000000000..87e1feecdc --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_deadlock.rs @@ -0,0 +1,34 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} +fn spin_lock(lock: &AtomicUsize) { + while lock.compare_and_swap(0, 1, Ordering::AcqRel) != 0 { unsafe { miri_yield_thread(); } } //~ERROR livelock +} + + +fn main() { + let a1 = Arc::new(AtomicUsize::new(0)); + let a2 = a1.clone(); + let b1 = Arc::new(AtomicUsize::new(0)); + let b2 = b1.clone(); + let c1 = Arc::new(AtomicUsize::new(1)); + let c2 = c1.clone(); + let j1 = spawn(move || { + spin_lock(&a1); //Acquire a + spin_lock(&c1); //Waits for c2.store to execute + spin_lock(&b1); //Livelock wait for b + }); + let j2 = spawn(move || { + c2.store(0, Ordering::Release); + spin_lock(&b2); //Acquire b + spin_lock(&a2); //Livelock wait for a + }); + j1.join().unwrap(); + j2.join().unwrap(); +} diff --git a/tests/compile-fail/live_lock/live_lock_multi_read.rs b/tests/compile-fail/live_lock/live_lock_multi_read.rs new file mode 100644 index 0000000000..db3fec5af7 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_multi_read.rs @@ -0,0 +1,46 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} +fn spin_await(lock: &AtomicUsize, target: usize) { + while lock.load(Ordering::Acquire) != target { + // Extra reads before a yield, + // regression test for bug with wrong + // comparison in detecting variance while watching atomic variables. + let _ignore1 = lock.load(Ordering::Relaxed); + let _ignore2 = lock.load(Ordering::Acquire); + + unsafe { miri_yield_thread(); } //~ERROR livelock + } +} + + +fn main() { + let a = Arc::new(AtomicUsize::new(0)); + let b = a.clone(); + let c = a.clone(); + let d = a.clone(); + let j1 = spawn(move || { + spin_await(&a, 1) + }); + let j2 = spawn(move || { + spin_await(&b, 1) + }); + let j3 = spawn(move || { + spin_await(&c, 1) + }); + let j4 = spawn(move || { + d.store(2, Ordering::Release); + d.store(3, Ordering::Release); + d.store(4, Ordering::Release); + }); + j1.join().unwrap(); + j2.join().unwrap(); + j3.join().unwrap(); + j4.join().unwrap(); +} diff --git a/tests/compile-fail/live_lock/live_lock_separate.rs b/tests/compile-fail/live_lock/live_lock_separate.rs new file mode 100644 index 0000000000..8e13d360fc --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_separate.rs @@ -0,0 +1,33 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} +fn spin_await(lock: &AtomicUsize, target: usize) { + while lock.load(Ordering::Acquire) != target { unsafe { miri_yield_thread(); } } //~ERROR livelock +} + + +fn main() { + let a = Arc::new(AtomicUsize::new(0)); + let b = Arc::new(AtomicUsize::new(0)); + let c = Arc::new(AtomicUsize::new(0)); + let j1 = spawn(move || { + spin_await(&a, 1) + }); + let j2 = spawn(move || { + spin_await(&b, 2) + }); + let j3 = spawn(move || { + for i in 0..256 { + c.store(i, Ordering::Release) + } + }); + j1.join().unwrap(); + j2.join().unwrap(); + j3.join().unwrap(); +} diff --git a/tests/compile-fail/live_lock/live_lock_spin_deadlock.rs b/tests/compile-fail/live_lock/live_lock_spin_deadlock.rs new file mode 100644 index 0000000000..568c51a6e7 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_spin_deadlock.rs @@ -0,0 +1,51 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} + +struct SpinLock(AtomicUsize); +impl SpinLock { + fn new() -> Self { + Self(AtomicUsize::new(0)) + } + fn lock(&self) { + loop { + if let Ok(_) = self.0.compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed) { + break + } else { + unsafe { miri_yield_thread(); } //~ERROR livelock + } + } + } + fn unlock(&self) { + self.0.store(0, Ordering::Release); + } +} + +fn main() { + // forces a deadlock via yield points + let shared = Arc::new((SpinLock::new(),SpinLock::new())); + let s1 = shared.clone(); + let s2 = shared.clone(); + let j1 = spawn(move || { + s1.0.lock(); + std::thread::yield_now(); + s1.1.lock(); + s1.1.unlock(); + s1.0.unlock(); + }); + let j2 = spawn(move || { + s2.1.lock(); + std::thread::yield_now(); + s2.0.lock(); + s2.0.unlock(); + s2.1.unlock(); + }); + j1.join().unwrap(); + j2.join().unwrap(); +} diff --git a/tests/compile-fail/live_lock/live_lock_try_mutex.rs b/tests/compile-fail/live_lock/live_lock_try_mutex.rs new file mode 100644 index 0000000000..a813441630 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_try_mutex.rs @@ -0,0 +1,29 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::{Arc, Mutex}; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} + + +fn main() { + let shared = Arc::new(Mutex::new(0usize)); + let s1 = shared.clone(); + let mut s_guard = shared.lock().unwrap(); + let j1 = spawn(move || { + let mut a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_lock() { + break guard + } else { + unsafe { miri_yield_thread(); } //~ERROR livelock + } + }; + *a_guard = 2; + }); + + j1.join().unwrap(); + *s_guard = 1; +} diff --git a/tests/compile-fail/live_lock/live_lock_try_rwlock_read.rs b/tests/compile-fail/live_lock/live_lock_try_rwlock_read.rs new file mode 100644 index 0000000000..431a7859a2 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_try_rwlock_read.rs @@ -0,0 +1,28 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::{Arc, RwLock}; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} + + +fn main() { + let shared = Arc::new(RwLock::new(0usize)); + let s1 = shared.clone(); + let mut s_guard = shared.write().unwrap(); + let j1 = spawn(move || { + let _a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_read() { + break guard + } else { + unsafe { miri_yield_thread(); } //~ERROR livelock + } + }; + }); + + j1.join().unwrap(); + *s_guard = 1; +} diff --git a/tests/compile-fail/live_lock/live_lock_try_rwlock_write.rs b/tests/compile-fail/live_lock/live_lock_try_rwlock_write.rs new file mode 100644 index 0000000000..f10e12e4d7 --- /dev/null +++ b/tests/compile-fail/live_lock/live_lock_try_rwlock_write.rs @@ -0,0 +1,29 @@ +// ignore-windows: Concurrency on Windows is not supported yet. + +use std::sync::{Arc, RwLock}; +use std::thread::spawn; + +extern "Rust" { + fn miri_yield_thread(); +} + + +fn main() { + let shared = Arc::new(RwLock::new(0usize)); + let s1 = shared.clone(); + let mut s_guard = shared.write().unwrap(); + let j1 = spawn(move || { + let mut a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_write() { + break guard + } else { + unsafe { miri_yield_thread(); } //~ERROR livelock + } + }; + *a_guard = 2; + }); + + j1.join().unwrap(); + *s_guard = 1; +} diff --git a/tests/run-pass/concurrency/sync.rs b/tests/run-pass/concurrency/sync.rs index e97da415cb..102296d8aa 100644 --- a/tests/run-pass/concurrency/sync.rs +++ b/tests/run-pass/concurrency/sync.rs @@ -1,5 +1,5 @@ // ignore-windows: Concurrency on Windows is not supported yet. -// compile-flags: -Zmiri-disable-isolation +// compile-flags: -Zmiri-disable-isolation -Zmiri-max-yield-iterations=3 use std::sync::mpsc::{channel, sync_channel}; use std::sync::{Arc, Barrier, Condvar, Mutex, Once, RwLock}; diff --git a/tests/run-pass/concurrency/sync_singlethread.rs b/tests/run-pass/concurrency/sync_singlethread.rs index ab0203906d..251cd98aa6 100644 --- a/tests/run-pass/concurrency/sync_singlethread.rs +++ b/tests/run-pass/concurrency/sync_singlethread.rs @@ -1,3 +1,5 @@ +// compile-flags: -Zmiri-max-yield-iterations=3 + use std::sync::{Mutex, TryLockError}; use std::sync::atomic; use std::hint; diff --git a/tests/run-pass/concurrency/tls_lib_drop.rs b/tests/run-pass/concurrency/tls_lib_drop.rs index 46f59ef620..fa2d09a6e5 100644 --- a/tests/run-pass/concurrency/tls_lib_drop.rs +++ b/tests/run-pass/concurrency/tls_lib_drop.rs @@ -1,4 +1,5 @@ // ignore-windows: Concurrency on Windows is not supported yet. +// compile-flags: -Zmiri-max-yield-iterations=10 use std::cell::RefCell; use std::thread; diff --git a/tests/run-pass/concurrency/yield_schedule.rs b/tests/run-pass/concurrency/yield_schedule.rs new file mode 100644 index 0000000000..7d65ad6c9d --- /dev/null +++ b/tests/run-pass/concurrency/yield_schedule.rs @@ -0,0 +1,225 @@ +// ignore-windows: Concurrency on Windows is not supported yet. +// compile-flags: -Zmiri-disable-isolation + +#![feature(once_cell)] + +use std::sync::{Arc, Barrier, Mutex, RwLock, Condvar}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::lazy::SyncOnceCell as OnceCell; +use std::thread::spawn; +use std::time::Duration; + +extern "Rust" { + fn miri_yield_thread(); +} + +//Variant of once_cell_does_not_leak_partially_constructed_boxes from matklad/once_cell +// with scope replaced with manual thread joins. +fn once_cell_test1() { + let n_tries = 1; + let n_readers = 1; + let n_writers = 1; + const MSG: &str = "Hello, World"; + + for _ in 0..n_tries { + let cell: Arc> = Arc::new(OnceCell::new()); + let mut joins = Vec::new(); + for _ in 0..n_readers { + let cell = cell.clone(); + joins.push(spawn(move || loop { + if let Some(msg) = cell.get() { + assert_eq!(msg, MSG); + break; + } + //Spin loop - add thread yield for liveness + std::thread::yield_now(); + })); + } + for _ in 0..n_writers { + let cell = cell.clone(); + joins.push(spawn(move || { + let _ = cell.set(MSG.to_owned()); + })); + } + for join in joins { + join.join().unwrap(); + } + } +} + + +// Variant of get_does_not_block from matklad/once_cell +// with scope replaced with manual thread joins. +fn once_cell_test2() { + let cell: Arc> = Arc::new(OnceCell::new()); + let barrier = Arc::new(Barrier::new(2)); + let join = { + let cell = cell.clone(); + let barrier = barrier.clone(); + spawn(move || { + cell.get_or_init(|| { + barrier.wait(); + barrier.wait(); + "hello".to_string() + }); + }) + }; + barrier.wait(); + assert_eq!(cell.get(), None); + barrier.wait(); + join.join().unwrap(); + assert_eq!(cell.get(), Some(&"hello".to_string())); +} + +fn yield_with_mutex() { + let shared = Arc::new(Mutex::new(0usize)); + let s1 = shared.clone(); + let s_guard = shared.lock().unwrap(); + let j1 = spawn(move || { + let mut a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_lock() { + break guard + } else { + std::thread::yield_now(); + } + }; + *a_guard = 2; + }); + + // Dropping after yield will only terminate + // if wake from blocking is implemented. + std::thread::yield_now(); + drop(s_guard); + j1.join().unwrap(); +} + +fn yield_with_rwlock_write() { + let shared = Arc::new(RwLock::new(0usize)); + let s1 = shared.clone(); + let s_guard = shared.write().unwrap(); + let j1 = spawn(move || { + let mut a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_write() { + break guard + } else { + std::thread::yield_now(); + } + }; + *a_guard = 2; + }); + + // Dropping after yield will only terminate + // if wake from blocking is implemented. + std::thread::yield_now(); + drop(s_guard); + j1.join().unwrap(); +} + +fn yield_with_rwlock_read() { + let shared = Arc::new(RwLock::new(0usize)); + let s1 = shared.clone(); + let s_guard = shared.write().unwrap(); + let j1 = spawn(move || { + let _a_guard = loop { + // yield loop for try-lock. + if let Ok(guard) = s1.try_read() { + break guard + } else { + std::thread::yield_now(); + } + }; + }); + + // Dropping after yield will only terminate + // if wake from blocking is implemented. + std::thread::yield_now(); + drop(s_guard); + j1.join().unwrap(); +} + +fn yield_with_condvar() { + let shared = Arc::new((Condvar::new(), Mutex::new(()))); + let s1 = shared.clone(); + let j1 = spawn(move || { + let mut lock = s1.1.lock().unwrap(); + loop { + match s1.0.wait_timeout(lock, Duration::from_millis(100)) { + Ok(_) => break, + Err(err) => { + lock = err.into_inner().0; + std::thread::yield_now(); + } + } + } + }); + + // Signal after yield yield will only terminate + // if wake from blocking is implemented. + std::thread::yield_now(); + shared.0.notify_one(); + j1.join().unwrap(); +} + + +fn print_yield_counters() { + let shared = Arc::new(AtomicUsize::new(0usize)); + let make_new = || { + let shared = shared.clone(); + move || { + let mut array = [0; 10]; + for i in 0..10 { + array[i] = shared.fetch_add(1, Ordering::SeqCst); + std::thread::yield_now(); + } + array + } + }; + let j1 = spawn(make_new()); + let j2 = spawn(make_new()); + let j3 = spawn(make_new()); + let j4 = spawn(make_new()); + println!("Interleave Yield"); + println!("Thread 1 = {:?}", j1.join().unwrap()); + println!("Thread 2 = {:?}", j2.join().unwrap()); + println!("Thread 3 = {:?}", j3.join().unwrap()); + println!("Thread 4 = {:?}", j4.join().unwrap()); +} + +fn spin_loop() { + static FLAG: AtomicUsize = AtomicUsize::new(0); + let fun = || { + while FLAG.load(Ordering::Acquire) == 0 { + // spin and wait + // Note: the thread yield or spin loop hint + // is required for termination, otherwise + // this will run forever. + + // Note: should change to use this once cfg(miri) to use intrinsic. + //std::sync::atomic::spin_loop_hint(); + unsafe { miri_yield_thread(); } + } + }; + let j1 = spawn(fun); + let j2 = spawn(fun); + let j3 = spawn(fun); + let j4 = spawn(fun); + std::thread::yield_now(); + FLAG.store(1, Ordering::Release); + j1.join().unwrap(); + j2.join().unwrap(); + j3.join().unwrap(); + j4.join().unwrap(); +} + +fn main() { + once_cell_test1(); + once_cell_test2(); + yield_with_mutex(); + yield_with_rwlock_write(); + yield_with_rwlock_read(); + yield_with_condvar(); + print_yield_counters(); + spin_loop(); +} diff --git a/tests/run-pass/concurrency/yield_schedule.stderr b/tests/run-pass/concurrency/yield_schedule.stderr new file mode 100644 index 0000000000..03676519d4 --- /dev/null +++ b/tests/run-pass/concurrency/yield_schedule.stderr @@ -0,0 +1,2 @@ +warning: thread support is experimental and incomplete: weak memory effects are not emulated. + diff --git a/tests/run-pass/concurrency/yield_schedule.stdout b/tests/run-pass/concurrency/yield_schedule.stdout new file mode 100644 index 0000000000..f9c7bbb18f --- /dev/null +++ b/tests/run-pass/concurrency/yield_schedule.stdout @@ -0,0 +1,5 @@ +Interleave Yield +Thread 1 = [0, 4, 8, 12, 16, 20, 24, 28, 32, 36] +Thread 2 = [1, 5, 9, 13, 17, 21, 25, 29, 33, 37] +Thread 3 = [2, 6, 10, 14, 18, 22, 26, 30, 34, 38] +Thread 4 = [3, 7, 11, 15, 19, 23, 27, 31, 35, 39]