Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocols/stream/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Default for Behaviour {

impl Behaviour {
pub fn new() -> Self {
let (dial_sender, dial_receiver) = mpsc::channel(0);
let (dial_sender, dial_receiver) = mpsc::channel(32);

Self {
shared: Arc::new(Mutex::new(Shared::new(dial_sender))),
Expand Down
12 changes: 11 additions & 1 deletion protocols/stream/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ impl Control {
) -> Result<Stream, OpenStreamError> {
tracing::debug!(%peer, "Requesting new stream");

let mut new_stream_sender = Shared::lock(&self.shared).sender(peer);
let (mut new_stream_sender, dial_info) = {
let mut shared = Shared::lock(&self.shared);
shared.sender(peer)
};

// If we need to dial, do it outside the lock with backpressure
if let Some((peer_to_dial, mut dial_sender)) = dial_info {
// Propagate backpressure by awaiting the send instead of try_send
dial_sender.send(peer_to_dial).await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionReset, e))?;
}

let (sender, receiver) = oneshot::channel();

Expand Down
9 changes: 4 additions & 5 deletions protocols/stream/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Shared {
}
}

pub(crate) fn sender(&mut self, peer: PeerId) -> mpsc::Sender<NewStream> {
pub(crate) fn sender(&mut self, peer: PeerId) -> (mpsc::Sender<NewStream>, Option<(PeerId, mpsc::Sender<PeerId>)>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this function async do the send in the function instead of returning the Sender?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a look now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick follow-up.
Why is it not possible to make the sender fn async?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared::sender needs a &mut self, and we only have that while holding the MutexGuard if we make this whole thing async, the guard stays alive across an .await, which basically blocks other parts of the code that need Shared and sometimes the compiler don’t even allow it so we grab what we need under the lock, drop it, and then do the async send after

Copy link
Member

@elenaf9 elenaf9 Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, we'd need to return an manual -> impl Future<mpsc::Sender<NewStream>> and then clone the dial_sender before returning the async block.

However, the larger issue with this is that each clone of the dial_sender increases the channel's capacity by one. So effectively, we create an unbounded channel with this and prevent backpressure.

Still, I agree that we shouldn't hold the lock while blocking on the future.
So I guess the way to do this would be to create a Shared::poll_sender(PeerId, &mut Context<'_>) -> Poll<mpsc::Sender> function and poll that in Control::open_stream with poll_fn(|cx|Shared::lock(...).poll_sender(cx)) or something like that. Then we don't need to clone dial_sender.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried implementing the poll_sender approach but ran into waker handling issues with the mutex - tasks would hang because the waker registered during one lock acquisition didn't properly wake on subsequent polls. I switched to an unbounded channel approach instead. It fixes the silent drop problem and avoids the mutex/waker complexity. Would you prefer I continue debugging the poll_sender approach, or is the unbounded solution acceptable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad you already said we don't want an unbounded channel, i tried using poll_fn with poll_ready() to drop the lock between polls: poll_fn(|cx| Shared::lock(&self.shared).poll_send_dial(cx, peer)).await
However, when poll_ready() returns Poll::Pending, the waker is registered while holding the mutex. When the channel becomes ready and tries to wake the task, the waker needs to acquire the same mutex to make progress causing deadlock, is there anything i am missing on how to handle this? guidance would be much appreciated

let maybe_sender = self
.connections
.iter()
Expand All @@ -134,7 +134,7 @@ impl Shared {
Some(sender) => {
tracing::debug!("Returning sender to existing connection");

sender.clone()
(sender.clone(), None)
}
None => {
tracing::debug!(%peer, "Not connected to peer, initiating dial");
Expand All @@ -144,9 +144,8 @@ impl Shared {
.entry(peer)
.or_insert_with(|| mpsc::channel(0));

let _ = self.dial_sender.try_send(peer);

sender.clone()
// Return both the pending channel sender and the dial_sender for async use
(sender.clone(), Some((peer, self.dial_sender.clone())))
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions protocols/stream/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,47 @@ async fn dial_errors_are_propagated() {
assert_eq!(e.kind(), io::ErrorKind::NotConnected);
assert_eq!("Dial error: no addresses for peer.", e.to_string());
}

#[tokio::test]
async fn backpressure_on_many_concurrent_dials() {
let _ = tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env()
.unwrap(),
)
.with_test_writer()
.try_init();

let swarm1 = Swarm::new_ephemeral_tokio(|_| stream::Behaviour::new());
let control = swarm1.behaviour().new_control();

tokio::spawn(swarm1.loop_on_next());

// Spawn many concurrent dial attempts that will all fail
// Before the fix: some would silently drop and hang forever
// After the fix: all should fail with proper errors (backpressure propagated)
let mut handles = vec![];

for _ in 0..50 {
let mut control_clone = control.clone();
let handle = tokio::spawn(async move {
let result = control_clone.open_stream(PeerId::random(), PROTOCOL).await;
// All should fail, none should hang
assert!(result.is_err());
});
handles.push(handle);
}

// All tasks should complete (not hang indefinitely)
for handle in handles {
tokio::time::timeout(
std::time::Duration::from_secs(5),
handle,
)
.await
.expect("Task should not hang - backpressure should work")
.expect("Task should complete successfully");
}
}
Loading