Skip to content

Commit cff9efd

Browse files
committed
Add a jobserver proxy to ensure at least one token is always held
1 parent 25cdf1f commit cff9efd

File tree

10 files changed

+149
-29
lines changed

10 files changed

+149
-29
lines changed

compiler/rustc_data_structures/src/jobserver.rs

+89-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use std::sync::{LazyLock, OnceLock};
1+
use std::sync::{Arc, LazyLock, OnceLock};
22

33
pub use jobserver_crate::{Acquired, Client, HelperThread};
44
use jobserver_crate::{FromEnv, FromEnvErrorKind};
5+
use parking_lot::{Condvar, Mutex};
56

67
// We can only call `from_env_ext` once per process
78

@@ -71,10 +72,93 @@ pub fn client() -> Client {
7172
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone()
7273
}
7374

74-
pub fn acquire_thread() {
75-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok();
75+
struct ProxyData {
76+
/// The number of tokens assigned to threads.
77+
/// If this is 0, a single token is still assigned to this process, but is unused.
78+
used: u16,
79+
80+
/// The number of threads requesting a token
81+
pending: u16,
82+
}
83+
84+
/// This is a jobserver proxy used to ensure that we hold on to at least one token.
85+
pub struct Proxy {
86+
client: Client,
87+
data: Mutex<ProxyData>,
88+
89+
/// Threads which are waiting on a token will wait on this.
90+
wake_pending: Condvar,
91+
92+
helper: OnceLock<HelperThread>,
7693
}
7794

78-
pub fn release_thread() {
79-
GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok();
95+
impl Proxy {
96+
pub fn new() -> Arc<Self> {
97+
let proxy = Arc::new(Proxy {
98+
client: client(),
99+
data: Mutex::new(ProxyData { used: 1, pending: 0 }),
100+
wake_pending: Condvar::new(),
101+
helper: OnceLock::new(),
102+
});
103+
let proxy_ = Arc::clone(&proxy);
104+
let helper = proxy
105+
.client
106+
.clone()
107+
.into_helper_thread(move |token| {
108+
if let Ok(token) = token {
109+
let mut data = proxy_.data.lock();
110+
if data.pending > 0 {
111+
// Give the token to a waiting thread
112+
token.drop_without_releasing();
113+
assert!(data.used > 0);
114+
data.used += 1;
115+
data.pending -= 1;
116+
proxy_.wake_pending.notify_one();
117+
} else {
118+
// The token is no longer needed, drop it.
119+
drop(data);
120+
drop(token);
121+
}
122+
}
123+
})
124+
.expect("failed to create helper thread");
125+
proxy.helper.set(helper).unwrap();
126+
proxy
127+
}
128+
129+
pub fn acquire_thread(&self) {
130+
let mut data = self.data.lock();
131+
132+
if data.used == 0 {
133+
// There was a free token around. This can
134+
// happen when all threads release their token.
135+
assert_eq!(data.pending, 0);
136+
data.used += 1;
137+
} else {
138+
// Request a token from the helper thread. We can't directly use `acquire_raw`
139+
// as we also need to be able to wait for the final token in the process which
140+
// does not get a corresponding `release_raw` call.
141+
self.helper.get().unwrap().request_token();
142+
data.pending += 1;
143+
self.wake_pending.wait(&mut data);
144+
}
145+
}
146+
147+
pub fn release_thread(&self) {
148+
let mut data = self.data.lock();
149+
150+
if data.pending > 0 {
151+
// Give the token to a waiting thread
152+
data.pending -= 1;
153+
self.wake_pending.notify_one();
154+
} else {
155+
data.used -= 1;
156+
157+
// Release the token unless it's the last one in the process
158+
if data.used > 0 {
159+
drop(data);
160+
self.client.release_raw().ok();
161+
}
162+
}
163+
}
80164
}

compiler/rustc_data_structures/src/marker.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ macro_rules! already_send {
5959
// These structures are already `Send`.
6060
already_send!(
6161
[std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File]
62-
[rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler]
63-
[crate::owned_slice::OwnedSlice]
62+
[rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread]
63+
[crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
6464
);
6565

6666
macro_rules! impl_dyn_send {
@@ -134,8 +134,8 @@ macro_rules! already_sync {
134134
already_sync!(
135135
[std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8]
136136
[std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File]
137-
[jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler]
138-
[crate::owned_slice::OwnedSlice]
137+
[jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap]
138+
[crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice]
139139
);
140140

141141
// Use portable AtomicU64 for targets without native 64-bit atomics

compiler/rustc_interface/src/interface.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55
use rustc_ast::{LitKind, MetaItemKind, token};
66
use rustc_codegen_ssa::traits::CodegenBackend;
77
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
8-
use rustc_data_structures::jobserver;
8+
use rustc_data_structures::jobserver::{self, Proxy};
99
use rustc_data_structures::stable_hasher::StableHasher;
1010
use rustc_errors::registry::Registry;
1111
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
@@ -41,6 +41,7 @@ pub struct Compiler {
4141
pub codegen_backend: Box<dyn CodegenBackend>,
4242
pub(crate) override_queries: Option<fn(&Session, &mut Providers)>,
4343
pub(crate) current_gcx: CurrentGcx,
44+
pub(crate) jobserver_proxy: Arc<Proxy>,
4445
}
4546

4647
/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
@@ -415,7 +416,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
415416
config.opts.unstable_opts.threads,
416417
&config.extra_symbols,
417418
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
418-
|current_gcx| {
419+
|current_gcx, jobserver_proxy| {
419420
// The previous `early_dcx` can't be reused here because it doesn't
420421
// impl `Send`. Creating a new one is fine.
421422
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
@@ -511,6 +512,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
511512
codegen_backend,
512513
override_queries: config.override_queries,
513514
current_gcx,
515+
jobserver_proxy,
514516
};
515517

516518
// There are two paths out of `f`.

compiler/rustc_interface/src/passes.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{env, fs, iter};
77

88
use rustc_ast as ast;
99
use rustc_codegen_ssa::traits::CodegenBackend;
10+
use rustc_data_structures::jobserver::Proxy;
1011
use rustc_data_structures::parallel;
1112
use rustc_data_structures::steal::Steal;
1213
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
@@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
841842
dyn for<'tcx> FnOnce(
842843
&'tcx Session,
843844
CurrentGcx,
845+
Arc<Proxy>,
844846
&'tcx OnceLock<GlobalCtxt<'tcx>>,
845847
&'tcx WorkerLocal<Arena<'tcx>>,
846848
&'tcx WorkerLocal<rustc_hir::Arena<'tcx>>,
847849
F,
848850
) -> T,
849-
> = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| {
851+
> = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| {
850852
TyCtxt::create_global_ctxt(
851853
gcx_cell,
852854
sess,
@@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
865867
),
866868
providers.hooks,
867869
current_gcx,
870+
jobserver_proxy,
868871
|tcx| {
869872
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
870873
assert_eq!(feed.key(), LOCAL_CRATE);
@@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
887890
)
888891
});
889892

890-
inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f)
893+
inner(
894+
&compiler.sess,
895+
compiler.current_gcx.clone(),
896+
Arc::clone(&compiler.jobserver_proxy),
897+
&gcx_cell,
898+
&arena,
899+
&hir_arena,
900+
f,
901+
)
891902
}
892903

893904
/// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses.

compiler/rustc_interface/src/util.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
22
use std::path::{Path, PathBuf};
3-
use std::sync::OnceLock;
43
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, OnceLock};
55
use std::{env, iter, thread};
66

77
use rustc_ast as ast;
88
use rustc_codegen_ssa::traits::CodegenBackend;
9+
use rustc_data_structures::jobserver::Proxy;
910
use rustc_data_structures::sync;
1011
use rustc_metadata::{DylibError, load_symbol_from_dylib};
1112
use rustc_middle::ty::CurrentGcx;
@@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
113114
})
114115
}
115116

116-
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
117+
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
117118
thread_stack_size: usize,
118119
edition: Edition,
119120
sm_inputs: SourceMapInputs,
@@ -139,7 +140,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
139140
edition,
140141
extra_symbols,
141142
Some(sm_inputs),
142-
|| f(CurrentGcx::new()),
143+
|| f(CurrentGcx::new(), Proxy::new()),
143144
)
144145
})
145146
.unwrap()
@@ -152,7 +153,10 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
152153
})
153154
}
154155

155-
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
156+
pub(crate) fn run_in_thread_pool_with_globals<
157+
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
158+
R: Send,
159+
>(
156160
thread_builder_diag: &EarlyDiagCtxt,
157161
edition: Edition,
158162
threads: usize,
@@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
162166
) -> R {
163167
use std::process;
164168

169+
use rustc_data_structures::defer;
165170
use rustc_data_structures::sync::FromDyn;
166-
use rustc_data_structures::{defer, jobserver};
167171
use rustc_middle::ty::tls;
168172
use rustc_query_impl::QueryCtxt;
169173
use rustc_query_system::query::{QueryContext, break_query_cycles};
@@ -178,22 +182,26 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
178182
edition,
179183
sm_inputs,
180184
extra_symbols,
181-
|current_gcx| {
185+
|current_gcx, jobserver_proxy| {
182186
// Register the thread for use with the `WorkerLocal` type.
183187
registry.register();
184188

185-
f(current_gcx)
189+
f(current_gcx, jobserver_proxy)
186190
},
187191
);
188192
}
189193

190194
let current_gcx = FromDyn::from(CurrentGcx::new());
191195
let current_gcx2 = current_gcx.clone();
192196

197+
let proxy = Proxy::new();
198+
199+
let proxy_ = Arc::clone(&proxy);
200+
let proxy__ = Arc::clone(&proxy);
193201
let builder = rayon_core::ThreadPoolBuilder::new()
194202
.thread_name(|_| "rustc".to_string())
195-
.acquire_thread_handler(jobserver::acquire_thread)
196-
.release_thread_handler(jobserver::release_thread)
203+
.acquire_thread_handler(move || proxy_.acquire_thread())
204+
.release_thread_handler(move || proxy__.release_thread())
197205
.num_threads(threads)
198206
.deadlock_handler(move || {
199207
// On deadlock, creates a new thread and forwards information in thread
@@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
257265
},
258266
// Run `f` on the first thread in the thread pool.
259267
move |pool: &rayon_core::ThreadPool| {
260-
pool.install(|| f(current_gcx.into_inner()))
268+
pool.install(|| f(current_gcx.into_inner(), proxy))
261269
},
262270
)
263271
.unwrap()

compiler/rustc_middle/src/ty/context.rs

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rustc_data_structures::defer;
2121
use rustc_data_structures::fingerprint::Fingerprint;
2222
use rustc_data_structures::fx::FxHashMap;
2323
use rustc_data_structures::intern::Interned;
24+
use rustc_data_structures::jobserver::Proxy;
2425
use rustc_data_structures::profiling::SelfProfilerRef;
2526
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
2627
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
@@ -1438,6 +1439,8 @@ pub struct GlobalCtxt<'tcx> {
14381439
pub(crate) alloc_map: interpret::AllocMap<'tcx>,
14391440

14401441
current_gcx: CurrentGcx,
1442+
1443+
pub jobserver_proxy: Arc<Proxy>,
14411444
}
14421445

14431446
impl<'tcx> GlobalCtxt<'tcx> {
@@ -1642,6 +1645,7 @@ impl<'tcx> TyCtxt<'tcx> {
16421645
query_system: QuerySystem<'tcx>,
16431646
hooks: crate::hooks::Providers,
16441647
current_gcx: CurrentGcx,
1648+
jobserver_proxy: Arc<Proxy>,
16451649
f: impl FnOnce(TyCtxt<'tcx>) -> T,
16461650
) -> T {
16471651
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
@@ -1676,6 +1680,7 @@ impl<'tcx> TyCtxt<'tcx> {
16761680
data_layout,
16771681
alloc_map: interpret::AllocMap::new(),
16781682
current_gcx,
1683+
jobserver_proxy,
16791684
});
16801685

16811686
// This is a separate function to work around a crash with parallel rustc (#135870)

compiler/rustc_query_impl/src/plumbing.rs

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use std::num::NonZero;
66

7+
use rustc_data_structures::jobserver::Proxy;
78
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
89
use rustc_data_structures::sync::{DynSend, DynSync};
910
use rustc_data_structures::unord::UnordMap;
@@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> {
6970
impl<'tcx> QueryContext for QueryCtxt<'tcx> {
7071
type QueryInfo = QueryStackDeferred<'tcx>;
7172

73+
#[inline]
74+
fn jobserver_proxy(&self) -> &Proxy {
75+
&*self.jobserver_proxy
76+
}
77+
7278
#[inline]
7379
fn next_job_id(self) -> QueryJobId {
7480
QueryJobId(

0 commit comments

Comments
 (0)