Skip to content

Commit cbf25a9

Browse files
committed
Add a GNU make jobserver implementation to Cargo
This commit adds a GNU make jobserver implementation to Cargo, both as a client of existing jobservers and also a creator of new jobservers. The jobserver is actually just an IPC semaphore which manifests itself as a pipe with N bytes of tokens on Unix and a literal IPC semaphore on Windows. The rough protocol is then if you want to run a job you read acquire the semaphore (read a byte on Unix or wait on the semaphore on Windows) and then you release it when you're done. All the hairy details of the jobserver implementation are housed in the `jobserver` crate on crates.io instead of Cargo. This should hopefully make it much easier for the compiler to also share a jobserver implementation eventually. The main tricky bit here is that on Unix and Windows acquiring a jobserver token will block the calling thread. We need to either way for a running job to exit or to acquire a new token when we want to spawn a new job. To handle this the current implementation spawns a helper thread that does the blocking and sends a message back to Cargo when it receives a token. It's a little trickier with shutting down this thread gracefully as well but more details can be found in the `jobserver` crate. Unfortunately crates are unlikely to see an immediate benefit of this once implemented. Most crates are run with a manual `make -jN` and this overrides the jobserver in the environment, creating a new jobserver in the sub-make. If the `-jN` argument is removed, however, then `make` will share Cargo's jobserver and properly limit parallelism. Closes #1744
1 parent 03c0a41 commit cbf25a9

14 files changed

+376
-49
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ fs2 = "0.4"
2929
git2 = "0.6"
3030
git2-curl = "0.7"
3131
glob = "0.2"
32+
jobserver = "0.1.2"
3233
libc = "0.2"
3334
libgit2-sys = "0.6"
3435
log = "0.3"
@@ -37,8 +38,8 @@ rustc-serialize = "0.3"
3738
semver = { version = "0.7.0", features = ["serde"] }
3839
serde = "1.0"
3940
serde_derive = "1.0"
40-
serde_json = "1.0"
4141
serde_ignored = "0.0.3"
42+
serde_json = "1.0"
4243
shell-escape = "0.1"
4344
tar = { version = "0.4", default-features = false }
4445
tempdir = "0.3"

src/cargo/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ extern crate flate2;
1616
extern crate fs2;
1717
extern crate git2;
1818
extern crate glob;
19+
extern crate jobserver;
1920
extern crate libc;
2021
extern crate libgit2_sys;
2122
extern crate num_cpus;

src/cargo/ops/cargo_clean.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
3737
host_triple: host_triple,
3838
requested_target: opts.target.map(|s| s.to_owned()),
3939
release: opts.release,
40+
jobs: 1,
4041
..BuildConfig::default()
4142
},
4243
profiles)?;

src/cargo/ops/cargo_compile.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,11 @@ fn scrape_build_config(config: &Config,
639639
jobs: Option<u32>,
640640
target: Option<String>)
641641
-> CargoResult<ops::BuildConfig> {
642+
if jobs.is_some() && config.jobserver_from_env().is_some() {
643+
config.shell().warn("a `-j` argument was passed to Cargo but Cargo is \
644+
also configured with an external jobserver in \
645+
its environment, ignoring the `-j` parameter")?;
646+
}
642647
let cfg_jobs = match config.get_i64("build.jobs")? {
643648
Some(v) => {
644649
if v.val <= 0 {

src/cargo/ops/cargo_rustc/context.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::path::{Path, PathBuf};
88
use std::str::{self, FromStr};
99
use std::sync::Arc;
1010

11+
use jobserver::Client;
12+
1113
use core::{Package, PackageId, PackageSet, Resolve, Target, Profile};
1214
use core::{TargetKind, Profiles, Dependency, Workspace};
1315
use core::dependency::Kind as DepKind;
@@ -43,6 +45,7 @@ pub struct Context<'a, 'cfg: 'a> {
4345
pub build_scripts: HashMap<Unit<'a>, Arc<BuildScripts>>,
4446
pub links: Links<'a>,
4547
pub used_in_plugin: HashSet<Unit<'a>>,
48+
pub jobserver: Client,
4649

4750
host: Layout,
4851
target: Option<Layout>,
@@ -94,6 +97,21 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
9497
config.rustc()?.verbose_version.contains("-dev");
9598
let incremental_enabled = incremental_enabled && is_nightly;
9699

100+
// Load up the jobserver that we'll use to manage our parallelism. This
101+
// is the same as the GNU make implementation of a jobserver, and
102+
// intentionally so! It's hoped that we can interact with GNU make and
103+
// all share the same jobserver.
104+
//
105+
// Note that if we don't have a jobserver in our environment then we
106+
// create our own, and we create it with `n-1` tokens because one token
107+
// is ourself, a running process.
108+
let jobserver = match config.jobserver_from_env() {
109+
Some(c) => c.clone(),
110+
None => Client::new(build_config.jobs as usize - 1).chain_err(|| {
111+
"failed to create jobserver"
112+
})?,
113+
};
114+
97115
Ok(Context {
98116
ws: ws,
99117
host: host_layout,
@@ -114,6 +132,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
114132
links: Links::new(),
115133
used_in_plugin: HashSet::new(),
116134
incremental_enabled: incremental_enabled,
135+
jobserver: jobserver,
117136
})
118137
}
119138

src/cargo/ops/cargo_rustc/custom_build.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ fn build_work<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>)
115115
.env("PROFILE", if cx.build_config.release { "release" } else { "debug" })
116116
.env("HOST", cx.host_triple())
117117
.env("RUSTC", &cx.config.rustc()?.path)
118-
.env("RUSTDOC", &*cx.config.rustdoc()?);
118+
.env("RUSTDOC", &*cx.config.rustdoc()?)
119+
.inherit_jobserver(&cx.jobserver);
119120

120121
if let Some(links) = unit.pkg.manifest().links() {
121122
cmd.env("CARGO_MANIFEST_LINKS", links);

src/cargo/ops/cargo_rustc/job_queue.rs

Lines changed: 92 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
use std::collections::HashSet;
22
use std::collections::hash_map::HashMap;
33
use std::fmt;
4-
use std::io::Write;
4+
use std::io::{self, Write};
5+
use std::mem;
56
use std::sync::mpsc::{channel, Sender, Receiver};
67

78
use crossbeam::{self, Scope};
9+
use jobserver::{Acquired, HelperThread};
810
use term::color::YELLOW;
911

1012
use core::{PackageId, Target, Profile};
1113
use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
12-
use util::{CargoResult, ProcessBuilder, profile, internal};
14+
use util::{CargoResult, ProcessBuilder, profile, internal, CargoResultExt};
1315
use {handle_error};
1416

1517
use super::{Context, Kind, Unit};
@@ -21,10 +23,9 @@ use super::job::Job;
2123
/// actual compilation step of each package. Packages enqueue units of work and
2224
/// then later on the entire graph is processed and compiled.
2325
pub struct JobQueue<'a> {
24-
jobs: usize,
2526
queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
26-
tx: Sender<(Key<'a>, Message)>,
27-
rx: Receiver<(Key<'a>, Message)>,
27+
tx: Sender<Message<'a>>,
28+
rx: Receiver<Message<'a>>,
2829
active: usize,
2930
pending: HashMap<Key<'a>, PendingBuild>,
3031
compiled: HashSet<&'a PackageId>,
@@ -51,36 +52,35 @@ struct Key<'a> {
5152
}
5253

5354
pub struct JobState<'a> {
54-
tx: Sender<(Key<'a>, Message)>,
55-
key: Key<'a>,
55+
tx: Sender<Message<'a>>,
5656
}
5757

58-
enum Message {
58+
enum Message<'a> {
5959
Run(String),
6060
Stdout(String),
6161
Stderr(String),
62-
Finish(CargoResult<()>),
62+
Token(io::Result<Acquired>),
63+
Finish(Key<'a>, CargoResult<()>),
6364
}
6465

6566
impl<'a> JobState<'a> {
6667
pub fn running(&self, cmd: &ProcessBuilder) {
67-
let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
68+
let _ = self.tx.send(Message::Run(cmd.to_string()));
6869
}
6970

7071
pub fn stdout(&self, out: &str) {
71-
let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
72+
let _ = self.tx.send(Message::Stdout(out.to_string()));
7273
}
7374

7475
pub fn stderr(&self, err: &str) {
75-
let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
76+
let _ = self.tx.send(Message::Stderr(err.to_string()));
7677
}
7778
}
7879

7980
impl<'a> JobQueue<'a> {
8081
pub fn new<'cfg>(cx: &Context<'a, 'cfg>) -> JobQueue<'a> {
8182
let (tx, rx) = channel();
8283
JobQueue {
83-
jobs: cx.jobs() as usize,
8484
queue: DependencyQueue::new(),
8585
tx: tx,
8686
rx: rx,
@@ -113,56 +113,100 @@ impl<'a> JobQueue<'a> {
113113
pub fn execute(&mut self, cx: &mut Context) -> CargoResult<()> {
114114
let _p = profile::start("executing the job graph");
115115

116+
// We need to give a handle to the send half of our message queue to the
117+
// jobserver helper thrad. Unfortunately though we need the handle to be
118+
// `'static` as that's typically what's required when spawning a
119+
// thread!
120+
//
121+
// To work around this we transmute the `Sender` to a static lifetime.
122+
// we're only sending "longer living" messages and we should also
123+
// destroy all references to the channel before this function exits as
124+
// the destructor for the `helper` object will ensure the associated
125+
// thread i sno longer running.
126+
//
127+
// As a result, this `transmute` to a longer lifetime should be safe in
128+
// practice.
129+
let tx = self.tx.clone();
130+
let tx = unsafe {
131+
mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx)
132+
};
133+
let helper = cx.jobserver.clone().into_helper_thread(move |token| {
134+
drop(tx.send(Message::Token(token)));
135+
}).chain_err(|| {
136+
"failed to create helper thread for jobserver management"
137+
})?;
138+
116139
crossbeam::scope(|scope| {
117-
self.drain_the_queue(cx, scope)
140+
self.drain_the_queue(cx, scope, &helper)
118141
})
119142
}
120143

121-
fn drain_the_queue(&mut self, cx: &mut Context, scope: &Scope<'a>)
144+
fn drain_the_queue(&mut self,
145+
cx: &mut Context,
146+
scope: &Scope<'a>,
147+
jobserver_helper: &HelperThread)
122148
-> CargoResult<()> {
123149
use std::time::Instant;
124150

151+
let mut tokens = Vec::new();
125152
let mut queue = Vec::new();
126153
trace!("queue: {:#?}", self.queue);
127154

128155
// Iteratively execute the entire dependency graph. Each turn of the
129156
// loop starts out by scheduling as much work as possible (up to the
130-
// maximum number of parallel jobs). A local queue is maintained
131-
// separately from the main dependency queue as one dequeue may actually
132-
// dequeue quite a bit of work (e.g. 10 binaries in one project).
157+
// maximum number of parallel jobs we have tokens for). A local queue
158+
// is maintained separately from the main dependency queue as one
159+
// dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
160+
// in one project).
133161
//
134162
// After a job has finished we update our internal state if it was
135163
// successful and otherwise wait for pending work to finish if it failed
136164
// and then immediately return.
137165
let mut error = None;
138166
let start_time = Instant::now();
139167
loop {
140-
while error.is_none() && self.active < self.jobs {
141-
if !queue.is_empty() {
142-
let (key, job, fresh) = queue.remove(0);
143-
self.run(key, fresh, job, cx.config, scope)?;
144-
} else if let Some((fresh, key, jobs)) = self.queue.dequeue() {
145-
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
146-
f.combine(fresh)
147-
});
148-
self.pending.insert(key, PendingBuild {
149-
amt: jobs.len(),
150-
fresh: total_fresh,
151-
});
152-
queue.extend(jobs.into_iter().map(|(job, f)| {
153-
(key, job, f.combine(fresh))
154-
}));
155-
} else {
156-
break
168+
// Dequeue as much work as we can, learning about everything
169+
// possible that can run. Note that this is also the point where we
170+
// start requesting job tokens. Each job after the first needs to
171+
// request a token.
172+
while let Some((fresh, key, jobs)) = self.queue.dequeue() {
173+
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
174+
f.combine(fresh)
175+
});
176+
self.pending.insert(key, PendingBuild {
177+
amt: jobs.len(),
178+
fresh: total_fresh,
179+
});
180+
for (job, f) in jobs {
181+
queue.push((key, job, f.combine(fresh)));
182+
if self.active + queue.len() > 0 {
183+
jobserver_helper.request_token();
184+
}
157185
}
158186
}
187+
188+
// Now that we've learned of all possible work that we can execute
189+
// try to spawn it so long as we've got a jobserver token which says
190+
// we're able to perform some parallel work.
191+
while error.is_none() && self.active < tokens.len() + 1 && !queue.is_empty() {
192+
let (key, job, fresh) = queue.remove(0);
193+
self.run(key, fresh, job, cx.config, scope)?;
194+
}
195+
196+
// If after all that we're not actually running anything then we're
197+
// done!
159198
if self.active == 0 {
160199
break
161200
}
162201

163-
let (key, msg) = self.rx.recv().unwrap();
202+
// And finally, before we block waiting for the next event, drop any
203+
// excess tokens we may have accidentally acquired. Due to how our
204+
// jobserver interface is architected we may acquire a token that we
205+
// don't actually use, and if this happens just relinquish it back
206+
// to the jobserver itself.
207+
tokens.truncate(self.active - 1);
164208

165-
match msg {
209+
match self.rx.recv().unwrap() {
166210
Message::Run(cmd) => {
167211
cx.config.shell().verbose(|c| c.status("Running", &cmd))?;
168212
}
@@ -176,9 +220,13 @@ impl<'a> JobQueue<'a> {
176220
writeln!(cx.config.shell().err(), "{}", err)?;
177221
}
178222
}
179-
Message::Finish(result) => {
223+
Message::Finish(key, result) => {
180224
info!("end: {:?}", key);
181225
self.active -= 1;
226+
if self.active > 0 {
227+
assert!(tokens.len() > 0);
228+
drop(tokens.pop());
229+
}
182230
match result {
183231
Ok(()) => self.finish(key, cx)?,
184232
Err(e) => {
@@ -198,6 +246,11 @@ impl<'a> JobQueue<'a> {
198246
}
199247
}
200248
}
249+
Message::Token(acquired_token) => {
250+
tokens.push(acquired_token.chain_err(|| {
251+
"failed to acquire jobserver token"
252+
})?);
253+
}
201254
}
202255
}
203256

@@ -244,9 +297,8 @@ impl<'a> JobQueue<'a> {
244297
scope.spawn(move || {
245298
let res = job.run(fresh, &JobState {
246299
tx: my_tx.clone(),
247-
key: key,
248300
});
249-
my_tx.send((key, Message::Finish(res))).unwrap();
301+
my_tx.send(Message::Finish(key, res)).unwrap();
250302
});
251303

252304
// Print out some nice progress information

0 commit comments

Comments
 (0)