Skip to content

Commit 6e23095

Browse files
committed
Auto merge of rust-lang#140145 - Zoxc:job-server-proxy, r=SparrowLii
Add a jobserver proxy to ensure at least one token is always held This adds a jobserver proxy to ensure at least one token is always held by `rustc`. Currently with `-Z threads` `rustc` can temporarily give up all its tokens, causing `cargo` to spawn additional `rustc` instances beyond the job limit. The current behavior causes an issue with `cargo fix` which has a global lock preventing concurrent `rustc` instances, but it also holds a jobserver token, causing a deadlock when `rustc` gives up its token. That is fixed by this PR. Fixes rust-lang#67385. Fixes rust-lang#133873. Fixes rust-lang#140093.
2 parents 0c33fe2 + 08b27ff commit 6e23095

File tree

10 files changed

+156
-29
lines changed

10 files changed

+156
-29
lines changed

compiler/rustc_data_structures/src/jobserver.rs

+89-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use std::sync::{LazyLock, OnceLock};
1+
use std::sync::{Arc, LazyLock, OnceLock};
22

33
pub use jobserver_crate::{Acquired, Client, HelperThread};
44
use jobserver_crate::{FromEnv, FromEnvErrorKind};
5+
use parking_lot::{Condvar, Mutex};
56

67
// We can only call `from_env_ext` once per process
78

@@ -71,10 +72,93 @@ pub fn client() -> Client {
7172
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
7273
}
7374

74-
pub fn acquire_thread() {
75-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
75+
struct ProxyData {
76+
/// The number of tokens assigned to threads.
77+
/// If this is 0, a single token is still assigned to this process, but is unused.
78+
used: u16,
79+
80+
/// The number of threads requesting a token
81+
pending: u16,
82+
}
83+
84+
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
85+
pub struct Proxy {
86+
client: Client,
87+
data: Mutex<ProxyData>,
88+
89+
/// Threads which are waiting on a token will wait on this.
90+
wake_pending: Condvar,
91+
92+
helper: OnceLock<HelperThread>,
7693
}
7794

78-
pub fn release_thread() {
79-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
95+
impl Proxy {
96+
pub fn new() -> Arc<Self> {
97+
let proxy = Arc::new(Proxy {
98+
client: client(),
99+
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100+
wake_pending: Condvar::new(),
101+
helper: OnceLock::new(),
102+
});
103+
let proxy_ = Arc::clone(&proxy);
104+
let helper = proxy
105+
.client
106+
.clone()
107+
.into_helper_thread(move |token| {
108+
if let Ok(token) = token {
109+
let mut data = proxy_.data.lock();
110+
if data.pending > 0 {
111+
// Give the token to a waiting thread
112+
token.drop_without_releasing();
113+
assert!(data.used > 0);
114+
data.used += 1;
115+
data.pending -= 1;
116+
proxy_.wake_pending.notify_one();
117+
} else {
118+
// The token is no longer needed, drop it.
119+
drop(data);
120+
drop(token);
121+
}
122+
}
123+
})
124+
.expect("failed to create helper thread");
125+
proxy.helper.set(helper).unwrap();
126+
proxy
127+
}
128+
129+
pub fn acquire_thread(&self) {
130+
let mut data = self.data.lock();
131+
132+
if data.used == 0 {
133+
// There was a free token around. This can
134+
// happen when all threads release their token.
135+
assert_eq!(data.pending, 0);
136+
data.used += 1;
137+
} else {
138+
// Request a token from the helper thread. We can't directly use `acquire_raw`
139+
// as we also need to be able to wait for the final token in the process which
140+
// does not get a corresponding `release_raw` call.
141+
self.helper.get().unwrap().request_token();
142+
data.pending += 1;
143+
self.wake_pending.wait(&mut data);
144+
}
145+
}
146+
147+
pub fn release_thread(&self) {
148+
let mut data = self.data.lock();
149+
150+
if data.pending > 0 {
151+
// Give the token to a waiting thread
152+
data.pending -= 1;
153+
self.wake_pending.notify_one();
154+
} else {
155+
data.used -= 1;
156+
157+
// Release the token unless it's the last one in the process
158+
if data.used > 0 {
159+
drop(data);
160+
self.client.release_raw().ok();
161+
}
162+
}
163+
}
80164
}

compiler/rustc_data_structures/src/marker.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ macro_rules! already_send {
5959
// These structures are already `Send`.
6060
already_send!(
6161
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
62-
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
63-
[crate::owned_slice::OwnedSlice]
62+
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
63+
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
6464
);
6565

6666
macro_rules! impl_dyn_send {
@@ -134,8 +134,8 @@ macro_rules! already_sync {
134134
already_sync!(
135135
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
136136
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
137-
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
138-
[crate::owned_slice::OwnedSlice]
137+
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
138+
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
139139
);
140140

141141
// Use portable AtomicU64 for targets without native 64-bit atomics

compiler/rustc_interface/src/interface.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use rustc_ast::{LitKind, MetaItemKind, token};
66
use rustc_codegen_ssa::traits::CodegenBackend;
77
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
8-
use rustc_data_structures::jobserver;
8+
use rustc_data_structures::jobserver::{self, Proxy};
99
use rustc_data_structures::stable_hasher::StableHasher;
1010
use rustc_errors::registry::Registry;
1111
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@@ -40,7 +40,12 @@ pub struct Compiler {
4040
pub sess: Session,
4141
pub codegen_backend: Box<dyn CodegenBackend>,
4242
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
43+
44+
/// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`.
4345
pub(crate) current_gcx: CurrentGcx,
46+
47+
/// A jobserver reference which we pass on to `GlobalCtxt`.
48+
pub(crate) jobserver_proxy: Arc<Proxy>,
4449
}
4550

4651
/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@@ -415,7 +420,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
415420
config.opts.unstable_opts.threads,
416421
&config.extra_symbols,
417422
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
418-
|current_gcx| {
423+
|current_gcx, jobserver_proxy| {
419424
// The previous `early_dcx` can't be reused here because it doesn't
420425
// impl `Send`. Creating a new one is fine.
421426
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@@ -511,6 +516,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
511516
codegen_backend,
512517
override_queries: config.override_queries,
513518
current_gcx,
519+
jobserver_proxy,
514520
};
515521

516522
// There are two paths out of `f`.

compiler/rustc_interface/src/passes.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{env, fs, iter};
77

88
use rustc_ast as ast;
99
use rustc_codegen_ssa::traits::CodegenBackend;
10+
use rustc_data_structures::jobserver::Proxy;
1011
use rustc_data_structures::parallel;
1112
use rustc_data_structures::steal::Steal;
1213
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
841842
dyn for<'tcx> FnOnce(
842843
&'tcx Session,
843844
CurrentGcx,
845+
Arc<Proxy>,
844846
&'tcx OnceLock<GlobalCtxt<'tcx>>,
845847
&'tcx WorkerLocal<Arena<'tcx>>,
846848
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
847849
F,
848850
) -> T,
849-
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
851+
> = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
850852
TyCtxt::create_global_ctxt(
851853
gcx_cell,
852854
sess,
@@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
865867
),
866868
providers.hooks,
867869
current_gcx,
870+
jobserver_proxy,
868871
|tcx| {
869872
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
870873
assert_eq!(feed.key(), LOCAL_CRATE);
@@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
887890
)
888891
});
889892

890-
inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
893+
inner(
894+
&compiler.sess,
895+
compiler.current_gcx.clone(),
896+
Arc::clone(&compiler.jobserver_proxy),
897+
&gcx_cell,
898+
&arena,
899+
&hir_arena,
900+
f,
901+
)
891902
}
892903

893904
/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.

compiler/rustc_interface/src/util.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
22
use std::path::{Path, PathBuf};
3-
use std::sync::OnceLock;
43
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, OnceLock};
55
use std::{env, iter, thread};
66

77
use rustc_ast as ast;
88
use rustc_codegen_ssa::traits::CodegenBackend;
9+
use rustc_data_structures::jobserver::Proxy;
910
use rustc_data_structures::sync;
1011
use rustc_metadata::{DylibError, load_symbol_from_dylib};
1112
use rustc_middle::ty::CurrentGcx;
@@ -124,7 +125,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
124125
})
125126
}
126127

127-
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
128+
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
128129
thread_stack_size: usize,
129130
edition: Edition,
130131
sm_inputs: SourceMapInputs,
@@ -150,7 +151,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
150151
edition,
151152
extra_symbols,
152153
Some(sm_inputs),
153-
|| f(CurrentGcx::new()),
154+
|| f(CurrentGcx::new(), Proxy::new()),
154155
)
155156
})
156157
.unwrap()
@@ -163,7 +164,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
163164
})
164165
}
165166

166-
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
167+
pub(crate) fn run_in_thread_pool_with_globals<
168+
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
169+
R: Send,
170+
>(
167171
thread_builder_diag: &EarlyDiagCtxt,
168172
edition: Edition,
169173
threads: usize,
@@ -173,8 +177,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
173177
) -> R {
174178
use std::process;
175179

180+
use rustc_data_structures::defer;
176181
use rustc_data_structures::sync::FromDyn;
177-
use rustc_data_structures::{defer, jobserver};
178182
use rustc_middle::ty::tls;
179183
use rustc_query_impl::QueryCtxt;
180184
use rustc_query_system::query::{QueryContext, break_query_cycles};
@@ -189,22 +193,26 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
189193
edition,
190194
sm_inputs,
191195
extra_symbols,
192-
|current_gcx| {
196+
|current_gcx, jobserver_proxy| {
193197
// Register the thread for use with the `WorkerLocal` type.
194198
registry.register();
195199

196-
f(current_gcx)
200+
f(current_gcx, jobserver_proxy)
197201
},
198202
);
199203
}
200204

201205
let current_gcx = FromDyn::from(CurrentGcx::new());
202206
let current_gcx2 = current_gcx.clone();
203207

208+
let proxy = Proxy::new();
209+
210+
let proxy_ = Arc::clone(&proxy);
211+
let proxy__ = Arc::clone(&proxy);
204212
let builder = rayon_core::ThreadPoolBuilder::new()
205213
.thread_name(|_| "rustc".to_string())
206-
.acquire_thread_handler(jobserver::acquire_thread)
207-
.release_thread_handler(jobserver::release_thread)
214+
.acquire_thread_handler(move || proxy_.acquire_thread())
215+
.release_thread_handler(move || proxy__.release_thread())
208216
.num_threads(threads)
209217
.deadlock_handler(move || {
210218
// On deadlock, creates a new thread and forwards information in thread
@@ -268,7 +276,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
268276
},
269277
// Run `f` on the first thread in the thread pool.
270278
move |pool: &rayon_core::ThreadPool| {
271-
pool.install(|| f(current_gcx.into_inner()))
279+
pool.install(|| f(current_gcx.into_inner(), proxy))
272280
},
273281
)
274282
.unwrap()

compiler/rustc_middle/src/ty/context.rs

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rustc_data_structures::defer;
2121
use rustc_data_structures::fingerprint::Fingerprint;
2222
use rustc_data_structures::fx::FxHashMap;
2323
use rustc_data_structures::intern::Interned;
24+
use rustc_data_structures::jobserver::Proxy;
2425
use rustc_data_structures::profiling::SelfProfilerRef;
2526
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
2627
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@@ -1441,6 +1442,9 @@ pub struct GlobalCtxt<'tcx> {
14411442
pub(crate) alloc_map: interpret::AllocMap<'tcx>,
14421443

14431444
current_gcx: CurrentGcx,
1445+
1446+
/// A jobserver reference used to release then acquire a token while waiting on a query.
1447+
pub jobserver_proxy: Arc<Proxy>,
14441448
}
14451449

14461450
impl<'tcx> GlobalCtxt<'tcx> {
@@ -1645,6 +1649,7 @@ impl<'tcx> TyCtxt<'tcx> {
16451649
query_system: QuerySystem<'tcx>,
16461650
hooks: crate::hooks::Providers,
16471651
current_gcx: CurrentGcx,
1652+
jobserver_proxy: Arc<Proxy>,
16481653
f: impl FnOnce(TyCtxt<'tcx>) -> T,
16491654
) -> T {
16501655
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@@ -1679,6 +1684,7 @@ impl<'tcx> TyCtxt<'tcx> {
16791684
data_layout,
16801685
alloc_map: interpret::AllocMap::new(),
16811686
current_gcx,
1687+
jobserver_proxy,
16821688
});
16831689

16841690
// This is a separate function to work around a crash with parallel rustc (#135870)

compiler/rustc_query_impl/src/plumbing.rs

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use std::num::NonZero;
66

7+
use rustc_data_structures::jobserver::Proxy;
78
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
89
use rustc_data_structures::sync::{DynSend, DynSync};
910
use rustc_data_structures::unord::UnordMap;
@@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> {
6970
impl<'tcx> QueryContext for QueryCtxt<'tcx> {
7071
type QueryInfo = QueryStackDeferred<'tcx>;
7172

73+
#[inline]
74+
fn jobserver_proxy(&self) -> &Proxy {
75+
&*self.jobserver_proxy
76+
}
77+
7278
#[inline]
7379
fn next_job_id(self) -> QueryJobId {
7480
QueryJobId(

0 commit comments

Comments
 (0)