Skip to content

Commit 7eeaea1

Browse files
committed
refactor: use a separate task for the relay disco recv queue
1 parent 2f94cb4 commit 7eeaea1

File tree

2 files changed

+30
-24
lines changed

2 files changed

+30
-24
lines changed

iroh/src/magicsock.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1785,7 +1785,13 @@ impl Handle {
17851785

17861786
let mut actor_tasks = JoinSet::default();
17871787

1788-
let relay_actor = RelayActor::new(msock.clone(), relay_datagram_recv_queue, relay_protocol);
1788+
let (relay_disco_recv_tx, mut relay_disco_recv_rx) = tokio::sync::mpsc::channel(1024);
1789+
let relay_actor = RelayActor::new(
1790+
msock.clone(),
1791+
relay_datagram_recv_queue,
1792+
relay_disco_recv_tx,
1793+
relay_protocol,
1794+
);
17891795
let relay_actor_cancel_token = relay_actor.cancel_token();
17901796
actor_tasks.spawn(
17911797
async move {
@@ -1795,6 +1801,22 @@ impl Handle {
17951801
}
17961802
.instrument(info_span!("relay-actor")),
17971803
);
1804+
actor_tasks.spawn({
1805+
let msock = msock.clone();
1806+
async move {
1807+
while let Some(message) = relay_disco_recv_rx.recv().await {
1808+
msock.handle_disco_message(
1809+
message.source,
1810+
&message.sealed_box,
1811+
DiscoMessageSource::Relay {
1812+
url: message.relay_url,
1813+
key: message.relay_remote_node_id,
1814+
},
1815+
);
1816+
}
1817+
}
1818+
.instrument(info_span!("relay-disco-recv"))
1819+
});
17981820

17991821
#[cfg(not(wasm_browser))]
18001822
let _ = actor_tasks.spawn({

iroh/src/magicsock/relay_actor.rs

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ use super::RelayDatagramSendChannelReceiver;
6060
#[cfg(not(wasm_browser))]
6161
use crate::dns::DnsResolver;
6262
use crate::{
63-
magicsock::{
64-
DiscoMessageSource, MagicSock, Metrics as MagicsockMetrics, RelayContents,
65-
RelayDatagramRecvQueue,
66-
},
63+
magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayDatagramRecvQueue},
6764
util::MaybeFuture,
6865
};
6966

@@ -165,11 +162,11 @@ struct ActiveRelayActor {
165162
}
166163

167164
#[derive(Debug)]
168-
struct RelayDiscoMessage {
169-
source: PublicKey,
170-
sealed_box: Bytes,
171-
relay_url: RelayUrl,
172-
relay_remote_node_id: PublicKey,
165+
pub(super) struct RelayDiscoMessage {
166+
pub(super) source: PublicKey,
167+
pub(super) sealed_box: Bytes,
168+
pub(super) relay_url: RelayUrl,
169+
pub(super) relay_remote_node_id: PublicKey,
173170
}
174171

175172
#[derive(Debug)]
@@ -850,7 +847,6 @@ pub(super) struct RelayActor {
850847
/// [`AsyncUdpSocket::poll_recv`]: quinn::AsyncUdpSocket::poll_recv
851848
relay_datagram_recv_queue: Arc<RelayDatagramRecvQueue>,
852849
relay_disco_recv_tx: mpsc::Sender<RelayDiscoMessage>,
853-
relay_disco_recv_rx: mpsc::Receiver<RelayDiscoMessage>,
854850
/// The actors managing each currently used relay server.
855851
///
856852
/// These actors will exit when they have any inactivity. Otherwise they will keep
@@ -866,15 +862,14 @@ impl RelayActor {
866862
pub(super) fn new(
867863
msock: Arc<MagicSock>,
868864
relay_datagram_recv_queue: Arc<RelayDatagramRecvQueue>,
865+
relay_disco_recv_tx: mpsc::Sender<RelayDiscoMessage>,
869866
protocol: iroh_relay::http::Protocol,
870867
) -> Self {
871868
let cancel_token = CancellationToken::new();
872-
let (relay_disco_recv_tx, relay_disco_recv_rx) = mpsc::channel(1024);
873869
Self {
874870
msock,
875871
relay_datagram_recv_queue,
876872
relay_disco_recv_tx,
877-
relay_disco_recv_rx,
878873
active_relays: Default::default(),
879874
active_relay_tasks: JoinSet::new(),
880875
cancel_token,
@@ -902,17 +897,6 @@ impl RelayActor {
902897
debug!("shutting down");
903898
break;
904899
}
905-
Some(message) = self.relay_disco_recv_rx.recv() => {
906-
self.msock.
907-
handle_disco_message(
908-
message.source,
909-
&message.sealed_box,
910-
DiscoMessageSource::Relay {
911-
url: message.relay_url,
912-
key: message.relay_remote_node_id
913-
},
914-
);
915-
}
916900
Some(res) = self.active_relay_tasks.join_next() => {
917901
match res {
918902
Ok(()) => (),

0 commit comments

Comments
 (0)