-
Notifications
You must be signed in to change notification settings - Fork 507
feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first #878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
NobodyXu
merged 12 commits into
rust-lang:main
from
osiewicz:no_drops_on_implicit_job_token
Oct 19, 2023
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
0c87751
feat: Do not preemptively drop implicit job token.
osiewicz d0a8a49
Move implementation to a separate module and document a bunch
osiewicz ee5dee7
Readd reusable global jobserver connection
osiewicz 3637127
Fix up a comment
osiewicz 4ca7808
Further documentation refinements
osiewicz 1b025d8
Move jobserver func into job_token module
osiewicz 52afaf1
Remove should_return_to_queue member in favor of wrapping pool in an …
osiewicz 3d45841
Make jobserver initialization private in job_token mod
osiewicz 280c673
Remove unnecessary mut on acquire fn
osiewicz d55c382
Use shared global JobTokenServer
osiewicz 985dbae
Change Option to MaybeUninit
osiewicz 65ab371
Convert internal channel to use Result and do not panic in helper thread
osiewicz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
use jobserver::{Acquired, Client, HelperThread}; | ||
use std::{ | ||
env, | ||
mem::MaybeUninit, | ||
sync::{ | ||
mpsc::{self, Receiver, Sender}, | ||
Once, | ||
}, | ||
}; | ||
|
||
pub(crate) struct JobToken { | ||
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. | ||
/// Both are valid values to put into queue. | ||
token: Option<Acquired>, | ||
/// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead | ||
/// of storing it back in the pool - see [`Self::forget()`] function for that. | ||
pool: Option<Sender<Option<Result<Acquired, crate::Error>>>>, | ||
} | ||
|
||
impl Drop for JobToken { | ||
fn drop(&mut self) { | ||
if let Some(pool) = &self.pool { | ||
// Always send back an Ok() variant as we know that the acquisition for this token has succeeded. | ||
let _ = pool.send(self.token.take().map(|token| Ok(token))); | ||
} | ||
} | ||
} | ||
|
||
impl JobToken { | ||
/// Ensure that this token is not put back into queue once it's dropped. | ||
/// This also leads to releasing it sooner for other processes to use, | ||
/// which is a correct thing to do once it is known that there won't be | ||
/// any more token acquisitions. | ||
pub(crate) fn forget(&mut self) { | ||
self.pool.take(); | ||
} | ||
} | ||
|
||
/// A thin wrapper around jobserver's Client. | ||
/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse | ||
/// our own implicit token assigned for this build script. This struct manages that and | ||
/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. | ||
/// Furthermore, instead of giving up job tokens, it keeps them around | ||
/// for reuse if we know we're going to request another token after freeing the current one. | ||
pub(crate) struct JobTokenServer { | ||
helper: HelperThread, | ||
tx: Sender<Option<Result<Acquired, crate::Error>>>, | ||
rx: Receiver<Option<Result<Acquired, crate::Error>>>, | ||
} | ||
|
||
impl JobTokenServer { | ||
pub(crate) fn new() -> &'static Self { | ||
jobserver() | ||
} | ||
fn new_inner(client: Client) -> Result<Self, crate::Error> { | ||
let (tx, rx) = mpsc::channel(); | ||
// Push the implicit token. Since JobTokens only give back what they got, | ||
// there should be at most one global implicit token in the wild. | ||
tx.send(None).unwrap(); | ||
let pool = tx.clone(); | ||
let helper = client.into_helper_thread(move |acq| { | ||
let _ = pool.send(Some(acq.map_err(|e| e.into()))); | ||
})?; | ||
Ok(Self { helper, tx, rx }) | ||
} | ||
|
||
pub(crate) fn acquire(&self) -> Result<JobToken, crate::Error> { | ||
let token = if let Ok(token) = self.rx.try_recv() { | ||
// Opportunistically check if there's a token that can be reused. | ||
token | ||
} else { | ||
// Cold path, request a token and block | ||
self.helper.request_token(); | ||
self.rx.recv().unwrap() | ||
}; | ||
let token = if let Some(token) = token { | ||
Some(token?) | ||
} else { | ||
None | ||
}; | ||
Ok(JobToken { | ||
token, | ||
pool: Some(self.tx.clone()), | ||
}) | ||
} | ||
} | ||
|
||
/// Returns a suitable `JobTokenServer` used to coordinate | ||
/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures | ||
/// that only one implicit job token is used in the wild. | ||
/// Having multiple separate job token servers would lead to each of them assuming that they have control | ||
/// over the implicit job token. | ||
/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most | ||
/// one implicit job token in the wild. | ||
fn jobserver() -> &'static JobTokenServer { | ||
static INIT: Once = Once::new(); | ||
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit(); | ||
|
||
fn _assert_sync<T: Sync>() {} | ||
_assert_sync::<jobserver::Client>(); | ||
|
||
unsafe { | ||
INIT.call_once(|| { | ||
let server = default_jobserver(); | ||
JOBSERVER = MaybeUninit::new( | ||
JobTokenServer::new_inner(server).expect("Job server initialization failed"), | ||
); | ||
}); | ||
// Poor man's assume_init_ref, as that'd require a MSRV of 1.55. | ||
&*JOBSERVER.as_ptr() | ||
} | ||
} | ||
|
||
unsafe fn default_jobserver() -> jobserver::Client { | ||
// Try to use the environmental jobserver which Cargo typically | ||
// initializes for us... | ||
if let Some(client) = jobserver::Client::from_env() { | ||
return client; | ||
} | ||
|
||
// ... but if that fails for whatever reason select something | ||
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's | ||
// configured by Cargo) and otherwise just fall back to a | ||
// semi-reasonable number. Note that we could use `num_cpus` here | ||
// but it's an extra dependency that will almost never be used, so | ||
// it's generally not too worth it. | ||
let mut parallelism = 4; | ||
if let Ok(amt) = env::var("NUM_JOBS") { | ||
if let Ok(amt) = amt.parse() { | ||
parallelism = amt; | ||
} | ||
} | ||
|
||
// If we create our own jobserver then be sure to reserve one token | ||
// for ourselves. | ||
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); | ||
client.acquire_raw().expect("failed to acquire initial"); | ||
return client; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.