diff --git a/Cargo.toml b/Cargo.toml index 8cc994596..da14b480d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,24 +1,24 @@ [package] -name = "rayon" +name = "rustc-rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "1.2.0" +version = "0.3.0" authors = ["Niko Matsakis ", "Josh Stone "] -description = "Simple work-stealing parallelism for Rust" +description = "Simple work-stealing parallelism for Rust - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] exclude = ["/ci/*", "/scripts/*", "/.travis.yml", "/appveyor.yml", "/bors.toml"] [workspace] -members = ["rayon-demo", "rayon-core", "rayon-futures"] +members = ["rayon-core"] exclude = ["ci"] [dependencies] -rayon-core = { version = "1.6.0", path = "rayon-core" } +rustc-rayon-core = { version = "0.3", path = "rayon-core" } crossbeam-deque = "0.7" # This is a public dependency! diff --git a/README.md b/README.md index 16c5ed116..6a0d54651 100644 --- a/README.md +++ b/README.md @@ -1,130 +1,10 @@ -# Rayon +# rustc-rayon -[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon) -[![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon) -[![Travis Status](https://travis-ci.org/rayon-rs/rayon.svg?branch=master)](https://travis-ci.org/rayon-rs/rayon) -[![Appveyor status](https://ci.appveyor.com/api/projects/status/wre5dkx08gayy8hc/branch/master?svg=true)](https://ci.appveyor.com/project/cuviper/rayon/branch/master) -[![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -Rayon is a data-parallelism library for Rust. It is extremely -lightweight and makes it easy to convert a sequential computation into -a parallel one. It also guarantees data-race freedom. (You may also -enjoy [this blog post][blog] about Rayon, which gives more background -and details about how it works, or [this video][video], from the Rust -Belt Rust conference.) Rayon is -[available on crates.io](https://crates.io/crates/rayon), and -[API Documentation is available on docs.rs](https://docs.rs/rayon/). - -[blog]: http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ -[video]: https://www.youtube.com/watch?v=gof_OEv71Aw - -## Parallel iterators and more - -Rayon makes it drop-dead simple to convert sequential iterators into -parallel ones: usually, you just change your `foo.iter()` call into -`foo.par_iter()`, and Rayon does the rest: - -```rust -use rayon::prelude::*; -fn sum_of_squares(input: &[i32]) -> i32 { - input.par_iter() // <-- just change that! - .map(|&i| i * i) - .sum() -} -``` - -[Parallel iterators] take care of deciding how to divide your data -into tasks; it will dynamically adapt for maximum performance. If you -need more flexibility than that, Rayon also offers the [join] and -[scope] functions, which let you create parallel tasks on your own. -For even more control, you can create [custom threadpools] rather than -using Rayon's default, global threadpool. - -[Parallel iterators]: https://docs.rs/rayon/*/rayon/iter/index.html -[join]: https://docs.rs/rayon/*/rayon/fn.join.html -[scope]: https://docs.rs/rayon/*/rayon/fn.scope.html -[custom threadpools]: https://docs.rs/rayon/*/rayon/struct.ThreadPool.html - -## No data races - -You may have heard that parallel execution can produce all kinds of -crazy bugs. Well, rest easy. Rayon's APIs all guarantee **data-race -freedom**, which generally rules out most parallel bugs (though not -all). In other words, **if your code compiles**, it typically does the -same thing it did before. - -For the most, parallel iterators in particular are guaranteed to -produce the same results as their sequential counterparts. One caveat: -If your iterator has side effects (for example, sending methods to -other threads through a [Rust channel] or writing to disk), those side -effects may occur in a different order. Note also that, in some cases, -parallel iterators offer alternative versions of the sequential -iterator methods that can have higher performance. - -[Rust channel]: https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html - -## Using Rayon - -[Rayon is available on crates.io](https://crates.io/crates/rayon). The -recommended way to use it is to add a line into your Cargo.toml such -as: - -```toml -[dependencies] -rayon = "1.1" -``` - -and then add the following to your `lib.rs`: - -```rust -extern crate rayon; -``` - -To use the Parallel Iterator APIs, a number of traits have to be in -scope. The easiest way to bring those things into scope is to use the -[Rayon prelude](https://docs.rs/rayon/*/rayon/prelude/index.html). In -each module where you would like to use the parallel iterator APIs, -just add: - -```rust -use rayon::prelude::*; -``` - -Rayon currently requires `rustc 1.28.0` or greater. - -## Contribution - -Rayon is an open source project! If you'd like to contribute to Rayon, check out [the list of "help wanted" issues](https://github.com/rayon-rs/rayon/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22). These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions if anything is unclear! Also, check out the [Guide to Development](https://github.com/rayon-rs/rayon/wiki/Guide-to-Development) page on the wiki. Note that all code submitted in PRs to Rayon is assumed to [be licensed under Rayon's dual MIT/Apache2 licensing](https://github.com/rayon-rs/rayon/blob/master/README.md#license). - -## Quick demo - -To see Rayon in action, check out the `rayon-demo` directory, which -includes a number of demos of code using Rayon. For example, run this -command to get a visualization of an nbody simulation. To see the -effect of using Rayon, press `s` to run sequentially and `p` to run in -parallel. - -```text -> cd rayon-demo -> cargo run --release -- nbody visualize -``` - -For more information on demos, try: - -```text -> cd rayon-demo -> cargo run --release -- --help -``` - -## Other questions? - -See [the Rayon FAQ][faq]. - -[faq]: https://github.com/rayon-rs/rayon/blob/master/FAQ.md +rustc-rayon is a fork of [the Rayon crate](https://github.com/rayon-rs/rayon/). It adds a few "in progress" features that rustc is using, mostly around deadlock detection. These features are not stable and should not be used by others -- though they may find their way into rayon proper at some point. In general, if you are not rustc, you should be using the real rayon crate, not rustc-rayon. =) ## License -Rayon is distributed under the terms of both the MIT license and the +rustc-rayon is a fork of rayon. rayon is distributed under the terms of both the MIT license and the Apache License (Version 2.0). See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT) for details. Opening a pull requests is assumed to signal agreement with these licensing terms. diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 72cd82718..2b32d5064 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,13 +1,12 @@ [package] -name = "rayon-core" -version = "1.6.0" # reminder to update html_root_url attribute +name = "rustc-rayon-core" +version = "0.3.0" # reminder to update html_root_url attribute authors = ["Niko Matsakis ", "Josh Stone "] -description = "Core APIs for Rayon" +description = "Core APIs for Rayon - fork for rustc" license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" -links = "rayon-core" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon-core/" build = "build.rs" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] diff --git a/rayon-core/README.md b/rayon-core/README.md index 0af629ce1..29bf4bde4 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -1,3 +1,5 @@ +Note: This is an unstable fork made for use in rustc + Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. diff --git a/rayon-core/src/flex_scope/mod.rs b/rayon-core/src/flex_scope/mod.rs new file mode 100644 index 000000000..509e787d4 --- /dev/null +++ b/rayon-core/src/flex_scope/mod.rs @@ -0,0 +1,123 @@ +use job::HeapJob; +use latch::LatchProbe; +use registry::{in_worker, Registry}; +use std::any::Any; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::Mutex; +use tlv; +use unwind; + +struct ActiveFlexScope { + /// thread registry where `scope()` was executed. + registry: Arc, + + /// if some job panicked, the error is stored here; it will be + /// propagated to the one who created the scope + panic: Option>, + + active_jobs: usize, + + terminated: bool, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: usize, +} + +pub struct FlexScope<'scope> { + data: Mutex>, + marker: PhantomData &'scope ()>, +} + +impl<'scope> FlexScope<'scope> { + pub fn new() -> Self { + Self { + data: Mutex::new(None), + marker: PhantomData, + } + } + + pub fn activate(&self, f: impl FnOnce() -> R) -> R { + // Activate the scope + let tlv = tlv::get(); + { + let mut data = self.data.lock().unwrap(); + assert!(data.is_none(), "scope was already activated"); + *data = Some(ActiveFlexScope { + active_jobs: 1, + registry: Registry::current(), + panic: None, + terminated: false, + tlv, + }) + } + + // Run the closure + let result = self.execute_job(f); + + // Wait on the remaining tasks. + in_worker(|owner_thread, _| unsafe { + owner_thread.wait_until(&self); + }); + + // Restore the TLV if we ran some jobs while waiting + tlv::set(tlv); + + let panic = { + let mut data = self.data.lock().unwrap(); + let panic = data.as_mut().unwrap().panic.take(); + + // Deactivate the scope + *data = None; + + panic + }; + + if let Some(panic) = panic { + unwind::resume_unwinding(panic); + } + + result.unwrap() + } + + pub fn spawn(&self, f: impl FnOnce() + Send + 'scope) { + let mut data = self.data.lock().unwrap(); + let data = data.as_mut().expect("the scope is not active"); + assert!(!data.terminated, "the scope is terminated"); + assert!(data.active_jobs != std::usize::MAX); + data.active_jobs += 1; + + let job_ref = unsafe { + Box::new(HeapJob::new(data.tlv, move || { + self.execute_job(move || f()); + })) + .as_job_ref() + }; + + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + data.registry.inject_or_push(job_ref); + } + + fn execute_job(&self, f: impl FnOnce() -> R) -> Option { + let result = unwind::halt_unwinding(f); + let mut data = self.data.lock().unwrap(); + let data = data.as_mut().unwrap(); + data.active_jobs -= 1; + if data.active_jobs == 0 { + // Mark the scope as terminated once the job count hits 0. + // This ensures other threads cannot spawn more jobs. + data.terminated = true; + } + result.map(|r| Some(r)).unwrap_or_else(|panic| { + data.panic = Some(panic); + None + }) + } +} + +impl<'scope> LatchProbe for FlexScope<'scope> { + fn probe(&self) -> bool { + self.data.lock().unwrap().as_ref().unwrap().active_jobs == 0 + } +} diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index cd205990d..2eea398c4 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -3,6 +3,7 @@ use latch::Latch; use std::any::Any; use std::cell::UnsafeCell; use std::mem; +use tlv; use unwind; pub(super) enum JobResult { @@ -73,6 +74,7 @@ where pub(super) latch: L, func: UnsafeCell>, result: UnsafeCell>, + tlv: usize, } impl StackJob @@ -81,11 +83,12 @@ where F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) fn new(func: F, latch: L) -> StackJob { + pub(super) fn new(tlv: usize, func: F, latch: L) -> StackJob { StackJob { latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), + tlv, } } @@ -114,6 +117,7 @@ where } let this = &*this; + tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = match unwind::halt_unwinding(call(func)) { @@ -136,15 +140,17 @@ where BODY: FnOnce() + Send, { job: UnsafeCell>, + tlv: usize, } impl HeapJob where BODY: FnOnce() + Send, { - pub(super) fn new(func: BODY) -> Self { + pub(super) fn new(tlv: usize, func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), + tlv, } } @@ -163,6 +169,7 @@ where { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); + tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); } diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index 676fbdfbf..7f8a5a2fc 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -3,6 +3,7 @@ use latch::{LatchProbe, SpinLatch}; use log::Event::*; use registry::{self, WorkerThread}; use std::any::Any; +use tlv; use unwind; use FnContext; @@ -135,10 +136,11 @@ where worker: worker_thread.index() }); + let tlv = tlv::get(); // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new(call_b(oper_b), SpinLatch::new()); + let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new()); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); @@ -146,7 +148,7 @@ where let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); let result_a = match status_a { Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), }; // Now that task A has finished, try to pop job B from the @@ -163,7 +165,11 @@ where log!(PoppedRhs { worker: worker_thread.index() }); + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + let result_b = job_b.run_inline(injected); + return (result_a, result_b); } else { log!(PoppedJob { @@ -183,6 +189,9 @@ where } } + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + (result_a, job_b.into_result()) }) } @@ -195,7 +204,12 @@ unsafe fn join_recover_from_panic( worker_thread: &WorkerThread, job_b_latch: &SpinLatch, err: Box, + tlv: usize, ) -> ! { worker_thread.wait_until(job_b_latch); + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + unwind::resume_unwinding(err) } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index de87ed523..cee976f28 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -20,9 +20,6 @@ //! succeed. #![doc(html_root_url = "https://docs.rs/rayon-core/1.6")] -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] -#![deny(unreachable_pub)] use std::any::Any; use std::env; @@ -50,6 +47,7 @@ mod log; #[macro_use] mod private; +mod flex_scope; mod job; mod join; mod latch; @@ -60,20 +58,26 @@ mod spawn; mod thread_pool; mod unwind; mod util; +mod worker_local; mod compile_fail; mod test; +pub mod tlv; + #[cfg(rayon_unstable)] pub mod internal; +pub use flex_scope::FlexScope; pub use join::{join, join_context}; pub use registry::ThreadBuilder; +pub use registry::{mark_blocked, mark_unblocked, Registry}; pub use scope::{scope, Scope}; pub use scope::{scope_fifo, ScopeFifo}; pub use spawn::{spawn, spawn_fifo}; pub use thread_pool::current_thread_has_pending_tasks; pub use thread_pool::current_thread_index; pub use thread_pool::ThreadPool; +pub use worker_local::WorkerLocal; use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; @@ -145,6 +149,9 @@ pub struct ThreadPoolBuilder { /// The stack size for the created worker threads stack_size: Option, + /// Closure invoked on deadlock. + deadlock_handler: Option>, + /// Closure invoked on worker thread start. start_handler: Option>, @@ -154,6 +161,12 @@ pub struct ThreadPoolBuilder { /// Closure invoked to spawn threads. spawn_handler: S, + /// Closure invoked when starting computations in a thread. + acquire_thread_handler: Option>, + + /// Closure invoked when blocking in a thread. + release_thread_handler: Option>, + /// If false, worker threads will execute spawned jobs in a /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. @@ -172,6 +185,9 @@ pub struct Configuration { /// may be invoked multiple times in parallel. type PanicHandler = dyn Fn(Box) + Send + Sync; +/// The type for a closure that gets invoked when the Rayon thread pool deadlocks +type DeadlockHandler = dyn Fn() + Send + Sync; + /// The type for a closure that gets invoked when a thread starts. The /// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. @@ -192,12 +208,23 @@ impl Default for ThreadPoolBuilder { stack_size: None, start_handler: None, exit_handler: None, + deadlock_handler: None, + acquire_thread_handler: None, + release_thread_handler: None, spawn_handler: DefaultSpawn, breadth_first: false, } } } +/// The type for a closure that gets invoked before starting computations in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type AcquireThreadHandler = dyn Fn() + Send + Sync; + +/// The type for a closure that gets invoked before blocking in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type ReleaseThreadHandler = dyn Fn() + Send + Sync; + impl ThreadPoolBuilder { /// Creates and returns a valid rayon thread pool builder, but does not initialize it. pub fn new() -> Self { @@ -300,7 +327,12 @@ impl ThreadPoolBuilder { Ok(()) }) .build()?; - Ok(with_pool(&pool)) + let result = unwind::halt_unwinding(|| with_pool(&pool)); + pool.wait_until_stopped(); + match result { + Ok(result) => Ok(result), + Err(err) => unwind::resume_unwinding(err), + } }); match result { @@ -378,6 +410,9 @@ impl ThreadPoolBuilder { stack_size: self.stack_size, start_handler: self.start_handler, exit_handler: self.exit_handler, + deadlock_handler: self.deadlock_handler, + acquire_thread_handler: self.acquire_thread_handler, + release_thread_handler: self.release_thread_handler, breadth_first: self.breadth_first, } } @@ -536,6 +571,48 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Takes the current acquire thread callback, leaving `None`. + fn take_acquire_thread_handler(&mut self) -> Option> { + self.acquire_thread_handler.take() + } + + /// Set a callback to be invoked when starting computations in a thread. + pub fn acquire_thread_handler(mut self, acquire_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); + self + } + + /// Takes the current release thread callback, leaving `None`. + fn take_release_thread_handler(&mut self) -> Option> { + self.release_thread_handler.take() + } + + /// Set a callback to be invoked when blocking in thread. + pub fn release_thread_handler(mut self, release_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.release_thread_handler = Some(Box::new(release_thread_handler)); + self + } + + /// Takes the current deadlock callback, leaving `None`. + fn take_deadlock_handler(&mut self) -> Option> { + self.deadlock_handler.take() + } + + /// Set a callback to be invoked on current deadlock. + pub fn deadlock_handler(mut self, deadlock_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.deadlock_handler = Some(Box::new(deadlock_handler)); + self + } + /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -689,8 +766,11 @@ impl fmt::Debug for ThreadPoolBuilder { ref get_thread_name, ref panic_handler, ref stack_size, + ref deadlock_handler, ref start_handler, ref exit_handler, + ref acquire_thread_handler, + ref release_thread_handler, spawn_handler: _, ref breadth_first, } = *self; @@ -705,16 +785,22 @@ impl fmt::Debug for ThreadPoolBuilder { } let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); + let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) + .field("deadlock_handler", &deadlock_handler) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) + .field("acquire_thread_handler", &acquire_thread_handler) + .field("release_thread_handler", &release_thread_handler) .field("breadth_first", &breadth_first) .finish() } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 1c43ac8d8..4e7f1a7b1 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -24,7 +24,10 @@ use std::thread; use std::usize; use unwind; use util::leak; -use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder}; +use { + AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, + ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, +}; /// Thread builder used for customization via /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler). @@ -133,13 +136,16 @@ where } } -pub(super) struct Registry { +pub struct Registry { thread_infos: Vec, sleep: Sleep, injected_jobs: SegQueue, panic_handler: Option>, + pub(crate) deadlock_handler: Option>, start_handler: Option>, exit_handler: Option>, + pub(crate) acquire_thread_handler: Option>, + pub(crate) release_thread_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -237,12 +243,15 @@ impl Registry { let registry = Arc::new(Registry { thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(), + sleep: Sleep::new(n_threads), injected_jobs: SegQueue::new(), terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), + deadlock_handler: builder.take_deadlock_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), + acquire_thread_handler: builder.take_acquire_thread_handler(), + release_thread_handler: builder.take_release_thread_handler(), }); // If we return early or panic, make sure to terminate existing threads. @@ -272,7 +281,7 @@ impl Registry { global_registry().clone() } - pub(super) fn current() -> Arc { + pub fn current() -> Arc { unsafe { let worker_thread = WorkerThread::current(); if worker_thread.is_null() { @@ -350,11 +359,24 @@ impl Registry { /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. - #[cfg(test)] pub(super) fn wait_until_stopped(&self) { + self.release_thread(); for info in &self.thread_infos { info.stopped.wait(); } + self.acquire_thread(); + } + + pub(crate) fn acquire_thread(&self) { + if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { + acquire_thread_handler(); + } + } + + pub(crate) fn release_thread(&self) { + if let Some(ref release_thread_handler) = self.release_thread_handler { + release_thread_handler(); + } } /// //////////////////////////////////////////////////////////////////////// @@ -495,6 +517,7 @@ impl Registry { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( + 0, |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); @@ -503,7 +526,9 @@ impl Registry { l, ); self.inject(&[job.as_job_ref()]); + self.release_thread(); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + self.acquire_thread(); job.into_result() }) } @@ -519,6 +544,7 @@ impl Registry { debug_assert!(current_thread.registry().id() != self.id()); let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); let job = StackJob::new( + 0, |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); @@ -564,6 +590,24 @@ impl Registry { } } +/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler +/// if no other worker thread is active +#[inline] +pub fn mark_blocked() { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + unsafe { + let registry = &(*worker_thread).registry; + registry.sleep.mark_blocked(®istry.deadlock_handler) + } +} + +/// Mark a previously blocked Rayon worker thread as unblocked +#[inline] +pub fn mark_unblocked(registry: &Registry) { + registry.sleep.mark_unblocked() +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(super) struct RegistryId { addr: usize, @@ -603,12 +647,12 @@ pub(super) struct WorkerThread { /// local queue used for `spawn_fifo` indirection fifo: JobFifo, - index: usize, + pub(crate) index: usize, /// A weak random number generator. rng: XorShift64Star, - registry: Arc, + pub(crate) registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is @@ -718,7 +762,10 @@ impl WorkerThread { yields = self.registry.sleep.work_found(self.index, yields); self.execute(job); } else { - yields = self.registry.sleep.no_work_found(self.index, yields); + yields = self + .registry + .sleep + .no_work_found(self.index, yields, &self.registry); } } @@ -810,6 +857,7 @@ unsafe fn main_loop(worker: Worker, registry: Arc, index: usiz } } + registry.acquire_thread(); worker_thread.wait_until(®istry.terminate_latch); // Should not be any work left in our queue. @@ -832,6 +880,8 @@ unsafe fn main_loop(worker: Worker, registry: Arc, index: usiz } // We're already exiting the thread, there's nothing else to do. } + + registry.release_thread(); } /// If already in a worker-thread, just execute `op`. Otherwise, diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index c3ad5b259..231e17726 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -15,6 +15,7 @@ use std::mem; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; +use tlv; use unwind; mod internal; @@ -60,6 +61,9 @@ struct ScopeBase<'scope> { /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. marker: PhantomData) + Send + Sync + 'scope>>, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: usize, } /// Create a "fork-join" scope `s` and invokes the closure with a @@ -287,13 +291,10 @@ struct ScopeBase<'scope> { /// propagated at that point. pub fn scope<'scope, OP, R>(op: OP) -> R where - OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, - R: Send, + OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope, { - in_worker(|owner_thread, _| { - let scope = Scope::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } - }) + let scope = Scope::<'scope>::new(); + unsafe { scope.base.complete(|| op(&scope)) } } /// Create a "fork-join" scope `s` with FIFO order, and invokes the @@ -378,19 +379,16 @@ where /// panics are propagated at that point. pub fn scope_fifo<'scope, OP, R>(op: OP) -> R where - OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, - R: Send, + OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope, { - in_worker(|owner_thread, _| { - let scope = ScopeFifo::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } - }) + let scope = ScopeFifo::<'scope>::new(); + unsafe { scope.base.complete(|| op(&scope)) } } impl<'scope> Scope<'scope> { - fn new(owner_thread: &WorkerThread) -> Self { + fn new() -> Self { Scope { - base: ScopeBase::new(owner_thread), + base: ScopeBase::new(), } } @@ -452,7 +450,7 @@ impl<'scope> Scope<'scope> { { self.base.increment(); unsafe { - let job_ref = Box::new(HeapJob::new(move || { + let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { self.base.execute_job(move || body(self)) })) .as_job_ref(); @@ -465,10 +463,10 @@ impl<'scope> Scope<'scope> { } impl<'scope> ScopeFifo<'scope> { - fn new(owner_thread: &WorkerThread) -> Self { - let num_threads = owner_thread.registry().num_threads(); + fn new() -> Self { + let num_threads = Registry::current().num_threads(); ScopeFifo { - base: ScopeBase::new(owner_thread), + base: ScopeBase::new(), fifos: (0..num_threads).map(|_| JobFifo::new()).collect(), } } @@ -493,7 +491,7 @@ impl<'scope> ScopeFifo<'scope> { { self.base.increment(); unsafe { - let job_ref = Box::new(HeapJob::new(move || { + let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { self.base.execute_job(move || body(self)) })) .as_job_ref(); @@ -513,13 +511,14 @@ impl<'scope> ScopeFifo<'scope> { impl<'scope> ScopeBase<'scope> { /// Create the base of a new scope for the given worker thread - fn new(owner_thread: &WorkerThread) -> Self { + fn new() -> Self { ScopeBase { - owner_thread_index: owner_thread.index(), - registry: owner_thread.registry().clone(), + owner_thread_index: 0, + registry: Registry::current(), panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: CountLatch::new(), marker: PhantomData, + tlv: tlv::get(), } } @@ -531,12 +530,16 @@ impl<'scope> ScopeBase<'scope> { /// appropriate. /// /// Unsafe because it must be executed on a worker thread. - unsafe fn complete(&self, owner_thread: &WorkerThread, func: FUNC) -> R + unsafe fn complete(&self, func: FUNC) -> R where FUNC: FnOnce() -> R, { let result = self.execute_job_closure(func); - self.steal_till_jobs_complete(owner_thread); + in_worker(|owner_thread, _| { + self.steal_till_jobs_complete(owner_thread); + }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); result.unwrap() // only None if `op` panicked, and that would have been propagated } @@ -613,6 +616,8 @@ impl<'scope> ScopeBase<'scope> { log!(ScopeCompletePanicked { owner_thread: owner_thread.index() }); + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); let value: Box> = mem::transmute(panic); unwind::resume_unwinding(*value); } else { diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index bc2af869f..d889de5a5 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -386,3 +386,36 @@ some of them were hit hard: - 8-10% overhead on nbody-parreduce - 35% overhead on increment-all - 245% overhead on join-recursively + +# Deadlock detection + +This module tracks a number of variables in order to detect deadlocks due to user code blocking. +These variables are stored in the `SleepData` struct which itself is kept behind a mutex. +It contains the following fields: +- `worker_count` - The number of threads in the thread pool. +- `active_threads` - The number of threads in the thread pool which are running + and aren't blocked in user code or sleeping. +- `blocked_threads` - The number of threads which are blocked in user code. + This doesn't include threads blocked by Rayon. + +User code can indicate blocking by calling `mark_blocked` before blocking and +calling `mark_unblocked` before unblocking a thread. +This will adjust `active_threads` and `blocked_threads` accordingly. + +When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to +`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked +by user code. + +A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0. +If we ignored `blocked_threads` we would have a deadlock +immediately when creating the thread pool. +We would also deadlock once the thread pool ran out of work. +It is not possible for Rayon itself to deadlock. +Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks. + +We check for the deadlock condition when +threads fall asleep in `mark_unblocked` and in `Sleep::sleep`. +If there's a deadlock detected we call the user provided deadlock handler while we hold the +lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and +`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread. +Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep. \ No newline at end of file diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 96f4c37ff..f953ff320 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -2,14 +2,39 @@ //! for an overview. use log::Event::*; +use registry::Registry; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Condvar, Mutex}; use std::thread; use std::usize; +use DeadlockHandler; + +struct SleepData { + /// The number of threads in the thread pool. + worker_count: usize, + + /// The number of threads in the thread pool which are running and + /// aren't blocked in user code or sleeping. + active_threads: usize, + + /// The number of threads which are blocked in user code. + /// This doesn't include threads blocked by this module. + blocked_threads: usize, +} + +impl SleepData { + /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler + #[inline] + pub fn deadlock_check(&self, deadlock_handler: &Option>) { + if self.active_threads == 0 && self.blocked_threads > 0 { + (deadlock_handler.as_ref().unwrap())(); + } + } +} pub(super) struct Sleep { state: AtomicUsize, - data: Mutex<()>, + data: Mutex, tickle: Condvar, } @@ -20,14 +45,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32; const ROUNDS_UNTIL_ASLEEP: usize = 64; impl Sleep { - pub(super) fn new() -> Sleep { + pub(super) fn new(worker_count: usize) -> Sleep { Sleep { state: AtomicUsize::new(AWAKE), - data: Mutex::new(()), + data: Mutex::new(SleepData { + worker_count, + active_threads: worker_count, + blocked_threads: 0, + }), tickle: Condvar::new(), } } + /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler + /// if no other worker thread is active + #[inline] + pub fn mark_blocked(&self, deadlock_handler: &Option>) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads > 0); + debug_assert!(data.blocked_threads < data.worker_count); + debug_assert!(data.active_threads > 0); + data.active_threads -= 1; + data.blocked_threads += 1; + + data.deadlock_check(deadlock_handler); + } + + /// Mark a previously blocked Rayon worker thread as unblocked + #[inline] + pub fn mark_unblocked(&self) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads < data.worker_count); + debug_assert!(data.blocked_threads > 0); + data.active_threads += 1; + data.blocked_threads -= 1; + } + fn anyone_sleeping(&self, state: usize) -> bool { state & SLEEPING != 0 } @@ -61,7 +114,12 @@ impl Sleep { } #[inline] - pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize { + pub(super) fn no_work_found( + &self, + worker_index: usize, + yields: usize, + registry: &Registry, + ) -> usize { log!(DidNotFindWork { worker: worker_index, yields: yields, @@ -88,7 +146,7 @@ impl Sleep { } } else { debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); - self.sleep(worker_index); + self.sleep(worker_index, registry); 0 } } @@ -122,7 +180,10 @@ impl Sleep { old_state: old_state, }); if self.anyone_sleeping(old_state) { - let _data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); + // Set the active threads to the number of workers, + // excluding threads blocked by the user since we won't wake those up + data.active_threads = data.worker_count - data.blocked_threads; self.tickle.notify_all(); } } @@ -188,7 +249,7 @@ impl Sleep { self.worker_is_sleepy(state, worker_index) } - fn sleep(&self, worker_index: usize) { + fn sleep(&self, worker_index: usize, registry: &Registry) { loop { // Acquire here suffices. If we observe that the current worker is still // sleepy, then in fact we know that no writes have occurred, and anyhow @@ -235,7 +296,7 @@ impl Sleep { // reason for the `compare_exchange` to fail is if an // awaken comes, in which case the next cycle around // the loop will just return. - let data = self.data.lock().unwrap(); + let mut data = self.data.lock().unwrap(); // This must be SeqCst on success because we want to // ensure: @@ -264,10 +325,18 @@ impl Sleep { log!(FellAsleep { worker: worker_index }); + + // Decrement the number of active threads and check for a deadlock + data.active_threads -= 1; + data.deadlock_check(®istry.deadlock_handler); + + registry.release_thread(); + let _ = self.tickle.wait(data).unwrap(); log!(GotAwoken { worker: worker_index }); + registry.acquire_thread(); return; } } else { diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index 37b2d1b36..66e0e1f3d 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -92,7 +92,7 @@ where // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); - Box::new(HeapJob::new({ + Box::new(HeapJob::new(0, { let registry = registry.clone(); move || { match unwind::halt_unwinding(func) { diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 093546ee0..fc1a90f64 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -274,6 +274,12 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { spawn::spawn_fifo_in(op, &self.registry) } } + + pub(crate) fn wait_until_stopped(self) { + let registry = self.registry.clone(); + drop(self); + registry.wait_until_stopped(); + } } impl Drop for ThreadPool { diff --git a/rayon-core/src/tlv.rs b/rayon-core/src/tlv.rs new file mode 100644 index 000000000..f035d12d9 --- /dev/null +++ b/rayon-core/src/tlv.rs @@ -0,0 +1,30 @@ +//! Allows access to the Rayon's thread local value +//! which is preserved when moving jobs across threads + +use std::cell::Cell; + +thread_local!(pub(crate) static TLV: Cell = Cell::new(0)); + +/// Sets the current thread-local value to `value` inside the closure. +/// The old value is restored when the closure ends +pub fn with R, R>(value: usize, f: F) -> R { + struct Reset(usize); + impl Drop for Reset { + fn drop(&mut self) { + TLV.with(|tlv| tlv.set(self.0)); + } + } + let _reset = Reset(get()); + TLV.with(|tlv| tlv.set(value)); + f() +} + +/// Sets the current thread-local value +pub fn set(value: usize) { + TLV.with(|tlv| tlv.set(value)); +} + +/// Returns the current thread-local value +pub fn get() -> usize { + TLV.with(|tlv| tlv.get()) +} diff --git a/rayon-core/src/worker_local.rs b/rayon-core/src/worker_local.rs new file mode 100644 index 000000000..4b92bd939 --- /dev/null +++ b/rayon-core/src/worker_local.rs @@ -0,0 +1,74 @@ +use registry::{Registry, WorkerThread}; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; + +#[repr(align(64))] +#[derive(Debug)] +struct CacheAligned(T); + +/// Holds worker-locals values for each thread in a thread pool. +/// You can only access the worker local value through the Deref impl +/// on the thread pool it was constructed on. It will panic otherwise +pub struct WorkerLocal { + locals: Vec>, + registry: Arc, +} + +unsafe impl Send for WorkerLocal {} +unsafe impl Sync for WorkerLocal {} + +impl WorkerLocal { + /// Creates a new worker local where the `initial` closure computes the + /// value this worker local should take for each thread in the thread pool. + #[inline] + pub fn new T>(mut initial: F) -> WorkerLocal { + let registry = Registry::current(); + WorkerLocal { + locals: (0..registry.num_threads()) + .map(|i| CacheAligned(initial(i))) + .collect(), + registry, + } + } + + /// Returns the worker-local value for each thread + #[inline] + pub fn into_inner(self) -> Vec { + self.locals.into_iter().map(|c| c.0).collect() + } + + fn current(&self) -> &T { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || &*(*worker_thread).registry as *const _ != &*self.registry as *const _ + { + panic!("WorkerLocal can only be used on the thread pool it was created on") + } + &self.locals[(*worker_thread).index].0 + } + } +} + +impl WorkerLocal> { + /// Joins the elements of all the worker locals into one Vec + pub fn join(self) -> Vec { + self.into_inner().into_iter().flat_map(|v| v).collect() + } +} + +impl fmt::Debug for WorkerLocal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.locals, f) + } +} + +impl Deref for WorkerLocal { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &T { + self.current() + } +} diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index bea412f57..196159af5 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Niko Matsakis "] publish = false [dependencies] -rayon = { path = "../" } +rustc-rayon = { path = "../" } cgmath = "0.17" docopt = "1" fixedbitset = "0.1.5" diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml index 4522b2e0b..821f1a623 100644 --- a/rayon-futures/Cargo.toml +++ b/rayon-futures/Cargo.toml @@ -10,7 +10,8 @@ documentation = "https://docs.rs/rayon-futures/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] +publish = false [dependencies] -rayon-core = { version = "1.3", path = "../rayon-core" } +rustc-rayon-core = { version = "0.1", path = "../rayon-core" } futures = "0.1.16" diff --git a/src/lib.rs b/src/lib.rs index fc0909d91..2dd148f18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,7 @@ extern crate crossbeam_deque; extern crate either; -extern crate rayon_core; +extern crate rustc_rayon_core as rayon_core; #[cfg(test)] extern crate rand;