Skip to content

Commit c52390a

Browse files
committed
fix: store a list of send wakers on the relay data queue
1 parent ceaca9f commit c52390a

File tree

1 file changed

+36
-8
lines changed

1 file changed

+36
-8
lines changed

iroh/src/magicsock.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,7 +2116,7 @@ impl RelayDatagramSendChannelReceiver {
21162116
struct RelayDatagramRecvQueue {
21172117
queue: ConcurrentQueue<RelayRecvDatagram>,
21182118
recv_waker: AtomicWaker,
2119-
send_waker: AtomicWaker,
2119+
send_wakers: ConcurrentQueue<Waker>,
21202120
}
21212121

21222122
impl RelayDatagramRecvQueue {
@@ -2125,7 +2125,7 @@ impl RelayDatagramRecvQueue {
21252125
Self {
21262126
queue: ConcurrentQueue::bounded(512),
21272127
recv_waker: AtomicWaker::new(),
2128-
send_waker: AtomicWaker::new(),
2128+
send_wakers: ConcurrentQueue::unbounded(),
21292129
}
21302130
}
21312131

@@ -2142,14 +2142,42 @@ impl RelayDatagramRecvQueue {
21422142
})
21432143
}
21442144

2145+
/// Polls for whether the queue has free slots for sending items.
2146+
///
2147+
/// If the queue has free slots, this returns [`Poll::Ready`].
2148+
/// If the queue is full, [`Poll::Pending`] is returned and the waker
2149+
/// is stored and woken once the queue has free slots.
2150+
///
2151+
/// This can be called from multiple tasks concurrently. If a slot becomes
2152+
/// available, all stored wakers will be woken simultaneously.
2153+
/// This also means that even if [`Poll::Ready`] is returned, it is not
2154+
/// guaranteed that [`Self::try_send`] will return `Ok` on the next call,
2155+
/// because another send task could have used the slot already.
21452156
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
21462157
if self.queue.is_closed() {
21472158
Poll::Ready(Err(anyhow!("Queue closed")))
21482159
} else if !self.queue.is_full() {
21492160
Poll::Ready(Ok(()))
21502161
} else {
2151-
self.send_waker.register(cx.waker());
2152-
Poll::Pending
2162+
match self.send_wakers.push(cx.waker().clone()) {
2163+
Ok(()) => Poll::Pending,
2164+
Err(concurrent_queue::PushError::Full(_)) => {
2165+
unreachable!("Send waker queue is unbounded")
2166+
}
2167+
Err(concurrent_queue::PushError::Closed(_)) => {
2168+
Poll::Ready(Err(anyhow!("Queue closed")))
2169+
}
2170+
}
2171+
}
2172+
}
2173+
2174+
async fn send_ready(&self) -> Result<()> {
2175+
std::future::poll_fn(|cx| self.poll_send_ready(cx)).await
2176+
}
2177+
2178+
fn wake_senders(&self) {
2179+
while let Ok(waker) = self.send_wakers.pop() {
2180+
waker.wake();
21532181
}
21542182
}
21552183

@@ -2168,28 +2196,28 @@ impl RelayDatagramRecvQueue {
21682196
fn poll_recv(&self, cx: &mut Context) -> Poll<Result<RelayRecvDatagram>> {
21692197
match self.queue.pop() {
21702198
Ok(value) => {
2171-
self.send_waker.wake();
2199+
self.wake_senders();
21722200
Poll::Ready(Ok(value))
21732201
}
21742202
Err(concurrent_queue::PopError::Empty) => {
21752203
self.recv_waker.register(cx.waker());
21762204

21772205
match self.queue.pop() {
21782206
Ok(value) => {
2179-
self.send_waker.wake();
21802207
self.recv_waker.take();
2208+
self.wake_senders();
21812209
Poll::Ready(Ok(value))
21822210
}
21832211
Err(concurrent_queue::PopError::Empty) => Poll::Pending,
21842212
Err(concurrent_queue::PopError::Closed) => {
21852213
self.recv_waker.take();
2186-
self.send_waker.wake();
2214+
self.wake_senders();
21872215
Poll::Ready(Err(anyhow!("Queue closed")))
21882216
}
21892217
}
21902218
}
21912219
Err(concurrent_queue::PopError::Closed) => {
2192-
self.send_waker.wake();
2220+
self.wake_senders();
21932221
Poll::Ready(Err(anyhow!("Queue closed")))
21942222
}
21952223
}

0 commit comments

Comments
 (0)