1
1
use futures:: {
2
2
future:: { BoxFuture , OptionFuture } ,
3
3
stream:: BoxStream ,
4
- Future , FutureExt , StreamExt ,
4
+ Future , FutureExt , Stream , StreamExt ,
5
5
} ;
6
6
use once_cell:: sync:: Lazy ;
7
7
use serde:: Deserialize ;
@@ -26,7 +26,7 @@ use tokio::{
26
26
} ;
27
27
use tokio_stream:: wrappers:: ReceiverStream ;
28
28
use tokio_util:: { io:: SyncIoBridge , sync:: CancellationToken } ;
29
- use tracing:: { instrument, trace, trace_span, warn, Instrument } ;
29
+ use tracing:: { info_span , instrument, trace, trace_span, warn, Instrument } ;
30
30
31
31
use crate :: {
32
32
bincode_input_closed,
@@ -764,8 +764,24 @@ impl<T> WithOutput<T> {
764
764
where
765
765
F : Future < Output = Result < T , E > > ,
766
766
{
767
- let stdout = ReceiverStream :: new ( stdout_rx) . collect ( ) ;
768
- let stderr = ReceiverStream :: new ( stderr_rx) . collect ( ) ;
767
+ Self :: try_absorb_stream (
768
+ task,
769
+ ReceiverStream :: new ( stdout_rx) ,
770
+ ReceiverStream :: new ( stderr_rx) ,
771
+ )
772
+ . await
773
+ }
774
+
775
+ async fn try_absorb_stream < F , E > (
776
+ task : F ,
777
+ stdout_rx : impl Stream < Item = String > ,
778
+ stderr_rx : impl Stream < Item = String > ,
779
+ ) -> Result < WithOutput < T > , E >
780
+ where
781
+ F : Future < Output = Result < T , E > > ,
782
+ {
783
+ let stdout = stdout_rx. collect ( ) ;
784
+ let stderr = stderr_rx. collect ( ) ;
769
785
770
786
let ( result, stdout, stderr) = join ! ( task, stdout, stderr) ;
771
787
let response = result?;
@@ -815,6 +831,15 @@ pub struct Coordinator<B> {
815
831
token : CancellationToken ,
816
832
}
817
833
834
+ /// Runs things.
835
+ ///
836
+ /// # Liveness concerns
837
+ ///
838
+ /// If you use one of the streaming versions (e.g. `begin_execute`),
839
+ /// you need to make sure that the stdout / stderr / status channels
840
+ /// are continuously read from or dropped completely. If not, one
841
+ /// channel can fill up, preventing the other channels from receiving
842
+ /// data as well.
818
843
impl < B > Coordinator < B >
819
844
where
820
845
B : Backend ,
@@ -2610,6 +2635,9 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
2610
2635
2611
2636
let ( tx, from_worker_rx) = mpsc:: channel ( 8 ) ;
2612
2637
tasks. spawn_blocking ( move || {
2638
+ let span = info_span ! ( "child_io_queue::input" ) ;
2639
+ let _span = span. enter ( ) ;
2640
+
2613
2641
let stdout = SyncIoBridge :: new ( stdout) ;
2614
2642
let mut stdout = BufReader :: new ( stdout) ;
2615
2643
@@ -2632,6 +2660,9 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
2632
2660
2633
2661
let ( to_worker_tx, mut rx) = mpsc:: channel ( 8 ) ;
2634
2662
tasks. spawn_blocking ( move || {
2663
+ let span = info_span ! ( "child_io_queue::output" ) ;
2664
+ let _span = span. enter ( ) ;
2665
+
2635
2666
let stdin = SyncIoBridge :: new ( stdin) ;
2636
2667
let mut stdin = BufWriter :: new ( stdin) ;
2637
2668
@@ -3182,33 +3213,35 @@ mod tests {
3182
3213
let ActiveExecution {
3183
3214
task,
3184
3215
stdin_tx : _,
3185
- mut stdout_rx,
3216
+ stdout_rx,
3186
3217
stderr_rx,
3187
3218
status_rx : _,
3188
3219
} = coordinator
3189
3220
. begin_execute ( token. clone ( ) , request)
3190
3221
. await
3191
3222
. unwrap ( ) ;
3192
3223
3193
- // Wait for some output before killing
3194
- let early_stdout = stdout_rx . recv ( ) . with_timeout ( ) . await . unwrap ( ) ;
3224
+ let stdout_rx = ReceiverStream :: new ( stdout_rx ) ;
3225
+ let stderr_rx = ReceiverStream :: new ( stderr_rx ) ;
3195
3226
3196
- token. cancel ( ) ;
3227
+ // We (a) want to wait for some output before we try to
3228
+ // kill the process and (b) need to keep pumping stdout /
3229
+ // stderr / status to avoid locking up the output.
3230
+ let stdout_rx = stdout_rx. inspect ( |_| token. cancel ( ) ) ;
3197
3231
3198
3232
let WithOutput {
3199
3233
response,
3200
3234
stdout,
3201
3235
stderr,
3202
- } = WithOutput :: try_absorb ( task, stdout_rx, stderr_rx)
3236
+ } = WithOutput :: try_absorb_stream ( task, stdout_rx, stderr_rx)
3203
3237
. with_timeout ( )
3204
3238
. await
3205
3239
. unwrap ( ) ;
3206
3240
3207
3241
assert ! ( !response. success, "{stderr}" ) ;
3208
3242
assert_contains ! ( response. exit_detail, "kill" ) ;
3209
3243
3210
- assert_contains ! ( early_stdout, "Before" ) ;
3211
- assert_not_contains ! ( stdout, "Before" ) ;
3244
+ assert_contains ! ( stdout, "Before" ) ;
3212
3245
assert_not_contains ! ( stdout, "After" ) ;
3213
3246
3214
3247
coordinator. shutdown ( ) . await ?;
0 commit comments