Skip to content

Commit 4fda8db

Browse files
committed
Rejigger parallel compilation support
This commit reimplements parallel compilation support with a few goals in mind: * Primarily the `jobserver` crate is now used to limit parallelism across build scripts * The `rayon` crate is no longer used to ensure this is a pretty lightweight dependency crate. It's hoped that after this bakes for a bit we may be able to turn this on by default!
1 parent 8589c8b commit 4fda8db

File tree

2 files changed

+125
-17
lines changed

2 files changed

+125
-17
lines changed

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ categories = ["development-tools::build-utils"]
1717
exclude = ["/.travis.yml", "/appveyor.yml"]
1818

1919
[dependencies]
20-
rayon = { version = "1.0", optional = true }
20+
num_cpus = { version = "1.10", optional = true }
21+
jobserver = { version = "0.1.16", optional = true }
2122

2223
[features]
23-
parallel = ["rayon"]
24+
parallel = ["num_cpus", "jobserver"]
2425

2526
[dev-dependencies]
2627
tempdir = "0.3"

src/lib.rs

+122-15
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@
5858
#![allow(deprecated)]
5959
#![deny(missing_docs)]
6060

61-
#[cfg(feature = "parallel")]
62-
extern crate rayon;
63-
6461
use std::collections::HashMap;
6562
use std::env;
6663
use std::ffi::{OsStr, OsString};
@@ -944,22 +941,132 @@ impl Build {
944941
}
945942

946943
#[cfg(feature = "parallel")]
947-
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
948-
use self::rayon::prelude::*;
944+
fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> {
945+
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
946+
use std::sync::Once;
947+
948+
// When compiling objects in parallel we do a few dirty tricks to speed
949+
// things up:
950+
//
951+
// * First is that we use the `jobserver` crate to limit the parallelism
952+
// of this build script. The `jobserver` crate will use a jobserver
953+
// configured by Cargo for build scripts to ensure that parallelism is
954+
// coordinated across C compilations and Rust compilations. Before we
955+
// compile anything we make sure to wait until we acquire a token.
956+
//
957+
// Note that this jobserver is cached globally so we only used one per
958+
// process and only worry about creating it once.
959+
//
960+
// * Next we use a raw `thread::spawn` per thread to actually compile
961+
// objects in parallel. We only actually spawn a thread after we've
962+
// acquired a token to perform some work
963+
//
964+
// * Finally though we want to keep the dependencies of this crate
965+
// pretty light, so we avoid using a safe abstraction like `rayon` and
966+
// instead rely on some bits of `unsafe` code. We know that this stack
967+
// frame persists while everything is compiling so we use all the
968+
// stack-allocated objects without cloning/reallocating. We use a
969+
// transmute to `State` with a `'static` lifetime to persist
970+
// everything we need across the boundary, and the join-on-drop
971+
// semantics of `JoinOnDrop` should ensure that our stack frame is
972+
// alive while threads are alive.
973+
//
974+
// With all that in mind we compile all objects in a loop here, after we
975+
// acquire the appropriate tokens, Once all objects have been compiled
976+
// we join on all the threads and propagate the results of compilation.
977+
//
978+
// Note that as a slight optimization we try to break out as soon as
979+
// possible as soon as any compilation fails to ensure that errors get
980+
// out to the user as fast as possible.
981+
let server = jobserver();
982+
let error = AtomicBool::new(false);
983+
let mut threads = Vec::new();
984+
for obj in objs {
985+
if error.load(SeqCst) {
986+
break;
987+
}
988+
let token = server.acquire()?;
989+
let state = State {
990+
build: self,
991+
obj,
992+
error: &error,
993+
};
994+
let state = unsafe { std::mem::transmute::<State, State<'static>>(state) };
995+
let thread = thread::spawn(|| {
996+
let state: State<'me> = state; // erase the `'static` lifetime
997+
let result = state.build.compile_object(state.obj);
998+
if result.is_err() {
999+
state.error.store(true, SeqCst);
1000+
}
1001+
drop(token); // make sure our jobserver token is released after the compile
1002+
return result;
1003+
});
1004+
threads.push(JoinOnDrop(Some(thread)));
1005+
}
9491006

950-
if let Some(amt) = self.getenv("NUM_JOBS") {
951-
if let Ok(amt) = amt.parse() {
952-
let _ = rayon::ThreadPoolBuilder::new()
953-
.num_threads(amt)
954-
.build_global();
1007+
for mut thread in threads {
1008+
if let Some(thread) = thread.0.take() {
1009+
thread.join().expect("thread should not panic")?;
9551010
}
9561011
}
9571012

958-
// Check for any errors and return the first one found.
959-
objs.par_iter()
960-
.with_max_len(1)
961-
.map(|obj| self.compile_object(obj))
962-
.collect()
1013+
return Ok(());
1014+
1015+
/// Shared state from the parent thread to the child thread. This
1016+
/// package of pointers is temporarily transmuted to a `'static`
1017+
/// lifetime to cross the thread boundary and then once the thread is
1018+
/// running we erase the `'static` to go back to an anonymous lifetime.
1019+
struct State<'a> {
1020+
build: &'a Build,
1021+
obj: &'a Object,
1022+
error: &'a AtomicBool,
1023+
}
1024+
1025+
/// Returns a suitable `jobserver::Client` used to coordinate
1026+
/// parallelism between build scripts.
1027+
fn jobserver() -> &'static jobserver::Client {
1028+
static INIT: Once = Once::new();
1029+
static mut JOBSERVER: Option<jobserver::Client> = None;
1030+
1031+
fn _assert_sync<T: Sync>() {}
1032+
_assert_sync::<jobserver::Client>();
1033+
1034+
unsafe {
1035+
INIT.call_once(|| {
1036+
let server = default_jobserver();
1037+
JOBSERVER = Some(server);
1038+
});
1039+
JOBSERVER.as_ref().unwrap()
1040+
}
1041+
}
1042+
1043+
unsafe fn default_jobserver() -> jobserver::Client {
1044+
// Try to use the environmental jobserver which Cargo typically
1045+
// initializes for us...
1046+
if let Some(client) = jobserver::Client::from_env() {
1047+
return client;
1048+
}
1049+
1050+
// ... but if that fails for whatever reason fall back to the number
1051+
// of cpus on the system or the `NUM_JOBS` env var.
1052+
let mut parallelism = num_cpus::get();
1053+
if let Ok(amt) = env::var("NUM_JOBS") {
1054+
if let Ok(amt) = amt.parse() {
1055+
parallelism = amt;
1056+
}
1057+
}
1058+
jobserver::Client::new(parallelism).expect("failed to create jobserver")
1059+
}
1060+
1061+
struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
1062+
1063+
impl Drop for JoinOnDrop {
1064+
fn drop(&mut self) {
1065+
if let Some(thread) = self.0.take() {
1066+
drop(thread.join());
1067+
}
1068+
}
1069+
}
9631070
}
9641071

9651072
#[cfg(not(feature = "parallel"))]

0 commit comments

Comments
 (0)