Skip to content

Commit 79cd16b

Browse files
committed
process_wrapper: make process_wrapper not use extra threads
1 parent cdd0d66 commit 79cd16b

File tree

3 files changed

+92
-50
lines changed

3 files changed

+92
-50
lines changed

util/process_wrapper/main.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ mod util;
2121
use std::fs::{copy, OpenOptions};
2222
use std::io;
2323
use std::process::{exit, Command, ExitStatus, Stdio};
24-
use std::sync::mpsc::sync_channel;
25-
26-
use output::process_output;
2724

2825
use crate::options::options;
26+
use crate::output::{process_output, LineOutput};
2927

3028
#[cfg(windows)]
3129
fn status_code(status: ExitStatus, was_killed: bool) -> i32 {
@@ -92,27 +90,24 @@ fn main() {
9290
let child_stderr = Box::new(child.stderr.take().unwrap());
9391

9492
let mut was_killed = false;
95-
let stderr_thread = if !opts.rustc_quit_on_rmeta {
93+
let result = if !opts.rustc_quit_on_rmeta {
9694
// Process output normally by forwarding stderr
97-
process_output(child_stderr, stderr, Some)
95+
process_output(child_stderr, stderr, LineOutput::Message)
9896
} else {
9997
let format = opts.rustc_output_format;
100-
// Process json rustc output and kill the subprocess when we get a signal
101-
// that we emitted a metadata file.
102-
// This receiver will block until a corresponding send happens.
103-
let (stop_sender, stop) = sync_channel(0);
104-
let thread = process_output(child_stderr, stderr, move |line| {
105-
rustc::stop_on_rmeta_completion(line, format, &stop_sender)
98+
let mut kill = false;
99+
let result = process_output(child_stderr, stderr, |line| {
100+
rustc::stop_on_rmeta_completion(line, format, &mut kill)
106101
});
107-
if stop.recv().is_ok() {
102+
if kill {
108103
// If recv returns Ok(), a signal was sent in this channel so we should terminate the child process.
109104
// We can safely ignore the Result from kill() as we don't care if the process already terminated.
110105
let _ = child.kill();
111106
was_killed = true;
112107
}
113-
thread
108+
result
114109
};
115-
stderr_thread.join().unwrap().unwrap();
110+
result.expect("process wrapper error: failed to process stderr");
116111

117112
let status = child
118113
.wait()

util/process_wrapper/output.rs

+48-19
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,56 @@
1+
// Copyright 2020 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use std::io::{self, prelude::*};
2-
use std::thread;
316

17+
/// LineOutput tells process_output what to do when a line is processed.
18+
/// If a Message is returned, it will be written to write_end, if
19+
/// Skip is returned nothing will be printed and execution continues,
20+
/// if Terminate is returned, process_output returns immediately.
21+
/// Terminate is used to stop processing when we see an emit metadata
22+
/// message.
23+
#[derive(Debug)]
24+
pub(crate) enum LineOutput {
25+
Message(String),
26+
Skip,
27+
Terminate,
28+
}
29+
30+
/// process_output reads lines from read_end and invokes process_line on each.
31+
/// Depending on the result of process_line, the modified message may be written
32+
/// to write_end.
433
pub(crate) fn process_output<F>(
5-
read_end: Box<dyn Read + Send>,
6-
write_end: Box<dyn Write + Send>,
34+
read_end: Box<dyn Read>,
35+
write_end: Box<dyn Write>,
736
mut process_line: F,
8-
) -> thread::JoinHandle<io::Result<()>>
37+
) -> io::Result<()>
938
where
10-
F: FnMut(String) -> Option<String> + Send + 'static,
39+
F: FnMut(String) -> LineOutput,
1140
{
12-
thread::spawn(move || {
13-
let mut reader = io::BufReader::new(read_end);
14-
let mut writer = io::LineWriter::new(write_end);
15-
loop {
16-
let mut line = String::new();
17-
let read_bytes = reader.read_line(&mut line)?;
18-
if read_bytes == 0 {
19-
break;
20-
}
21-
if let Some(to_write) = process_line(line) {
22-
writer.write_all(to_write.as_bytes())?;
23-
}
41+
let mut reader = io::BufReader::new(read_end);
42+
let mut writer = io::LineWriter::new(write_end);
43+
loop {
44+
let mut line = String::new();
45+
let read_bytes = reader.read_line(&mut line)?;
46+
if read_bytes == 0 {
47+
break;
2448
}
25-
Ok(())
26-
})
49+
match process_line(line) {
50+
LineOutput::Message(to_write) => writer.write_all(to_write.as_bytes())?,
51+
LineOutput::Skip => {}
52+
LineOutput::Terminate => return Ok(()),
53+
};
54+
}
55+
Ok(())
2756
}

util/process_wrapper/rustc.rs

+35-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
1-
use std::sync::mpsc::SyncSender;
1+
// Copyright 2020 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
215
use tinyjson::JsonValue;
316

17+
use crate::output::LineOutput;
18+
419
#[derive(Debug, Copy, Clone)]
520
pub(crate) enum ErrorFormat {
621
Json,
@@ -25,36 +40,39 @@ fn get_key(value: &JsonValue, key: &str) -> Option<String> {
2540
}
2641
}
2742

28-
// stop_on_rmeta_completion takes an output line from rustc configured with
29-
// --error-format=json, parses the json and returns the appropriate output
30-
// according to the original --error-format supplied to rustc.
31-
// In addition, it will send a signal to the stop channel when metadata is emitted
32-
// so the compiler can be terminated.
33-
// This is used to implement pipelining in rules_rust, please see
34-
// https://internals.rust-lang.org/t/evaluating-pipelined-rustc-compilation/10199
43+
/// stop_on_rmeta_completion takes an output line from rustc configured with
44+
/// --error-format=json, parses the json and returns the appropriate output
45+
/// according to the original --error-format supplied to rustc.
46+
/// In addition, it will signal to stop when metadata is emitted
47+
/// so the compiler can be terminated.
48+
/// This is used to implement pipelining in rules_rust, please see
49+
/// https://internals.rust-lang.org/t/evaluating-pipelined-rustc-compilation/10199
3550
pub(crate) fn stop_on_rmeta_completion(
3651
line: String,
3752
error_format: ErrorFormat,
38-
stop: &SyncSender<()>,
39-
) -> Option<String> {
53+
kill: &mut bool,
54+
) -> LineOutput {
4055
let parsed: JsonValue = line
4156
.parse()
4257
.expect("process wrapper error: expected json messages in pipeline mode");
4358
if let Some(emit) = get_key(&parsed, "emit") {
4459
// We don't want to print emit messages.
4560
// If the emit messages is "metadata" we can signal the process to quit
46-
if emit == "metadata" {
47-
stop.send(())
48-
.expect("process wrapper error: receiver closed");
49-
}
50-
return None;
61+
return if emit == "metadata" {
62+
*kill = true;
63+
LineOutput::Terminate
64+
} else {
65+
LineOutput::Skip
66+
};
5167
};
5268

5369
match error_format {
5470
// If the output should be json, we just forward the messages as-is
55-
ErrorFormat::Json => Some(line),
71+
ErrorFormat::Json => LineOutput::Message(line),
5672
// Otherwise we extract the "rendered" attribute.
5773
// If we don't find it we skip the line.
58-
_ => get_key(&parsed, "rendered"),
74+
_ => get_key(&parsed, "rendered")
75+
.map(LineOutput::Message)
76+
.unwrap_or(LineOutput::Skip),
5977
}
6078
}

0 commit comments

Comments
 (0)