Discovered this during some ad hoc testing with multiple connections to a single guest's serial console, where I observed some data being sent a second time to a listener that had already received it.
If the serial console task has read some data from the guest that needs to be written to listeners, it generates a future to do that writing and then includes it in the main select! that determines the task's next action (lines 116-125 below where cur_output is Some):
|
let (uart_read, ws_send) = |
|
if ws_sinks.is_empty() || cur_output.is_none() { |
|
(serial.read_source(&mut output).fuse(), Fuse::terminated()) |
|
} else { |
|
let range = cur_output.clone().unwrap(); |
|
( |
|
Fuse::terminated(), |
|
if !ws_sinks.is_empty() { |
|
futures::stream::iter( |
|
ws_sinks.iter_mut().zip(std::iter::repeat( |
|
Vec::from(&output[range]), |
|
)), |
|
) |
|
.for_each_concurrent(4, |((_i, ws), bin)| { |
|
ws.send(Message::binary(bin)).map(|_| ()) |
|
}) |
|
.fuse() |
|
} else { |
|
Fuse::terminated() |
|
}, |
|
) |
|
}; |
cur_output is only set to None if this future is the one chosen by select!:
|
// Transmit bytes from the UART through the WS |
|
_ = ws_send => { |
|
probes::serial_uart_out!(|| {}); |
|
cur_output = None; |
|
} |
But if this branch is not chosen, for_each_concurrent may have sent the current output bytes to some listeners and not to others. (The underlying websocket send may not be cancel-safe either but I haven't looked at its implementation.) This will cause the same data to be re-sent in the next loop iteration.
An approach along the lines suggested in RFD 400 section 5.2 might help here (and other approaches may work too):
- If there's already a send-to-clients future active, try to resolve it
- Else if there's no such future but there is some data waiting to be sent, create a new future to try to send that data
- Else try to read more data from the guest
Discovered this during some ad hoc testing with multiple connections to a single guest's serial console, where I observed some data being sent a second time to a listener that had already received it.
If the serial console task has read some data from the guest that needs to be written to listeners, it generates a future to do that writing and then includes it in the main
select!that determines the task's next action (lines 116-125 below wherecur_outputisSome):propolis/bin/propolis-server/src/lib/serial/mod.rs
Lines 109 to 130 in 1b34075
cur_outputis only set toNoneif this future is the one chosen byselect!:propolis/bin/propolis-server/src/lib/serial/mod.rs
Lines 226 to 230 in 1b34075
But if this branch is not chosen,
for_each_concurrentmay have sent the current output bytes to some listeners and not to others. (The underlying websocketsendmay not be cancel-safe either but I haven't looked at its implementation.) This will cause the same data to be re-sent in the next loop iteration.An approach along the lines suggested in RFD 400 section 5.2 might help here (and other approaches may work too):