diff --git a/Cargo.lock b/Cargo.lock index 94a9bc5f73..084957ef05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2403,8 +2403,7 @@ dependencies = [ [[package]] name = "iroh-quinn" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c6245c9ed906506ab9185e8d7f64857129aee4f935e899f398a3bd3b70338d" +source = "git+https://github.com/n0-computer/quinn.git?branch=matheus23/poll_writable_remote#f8bfe993842da46d57cbb4c7e26230b4ab525a29" dependencies = [ "bytes", "cfg_aliases", @@ -2423,8 +2422,7 @@ dependencies = [ [[package]] name = "iroh-quinn-proto" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "929d5d8fa77d5c304d3ee7cae9aede31f13908bd049f9de8c7c0094ad6f7c535" +source = "git+https://github.com/n0-computer/quinn.git?branch=matheus23/poll_writable_remote#f8bfe993842da46d57cbb4c7e26230b4ab525a29" dependencies = [ "bytes", "getrandom 0.2.16", @@ -2444,8 +2442,7 @@ dependencies = [ [[package]] name = "iroh-quinn-udp" version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c53afaa1049f7c83ea1331f5ebb9e6ebc5fdd69c468b7a22dd598b02c9bcc973" +source = "git+https://github.com/n0-computer/quinn.git?branch=matheus23/poll_writable_remote#f8bfe993842da46d57cbb4c7e26230b4ab525a29" dependencies = [ "cfg_aliases", "libc", diff --git a/Cargo.toml b/Cargo.toml index 593f1d0ec7..49eea0a595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,3 +40,8 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l [workspace.lints.clippy] unused-async = "warn" + +[patch.crates-io] +iroh-quinn = { git = "https://github.com/n0-computer/quinn.git", branch = "matheus23/poll_writable_remote" } +iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn.git", branch = "matheus23/poll_writable_remote" } +iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn.git", branch = "matheus23/poll_writable_remote" } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 21936aa2cb..302e266b0c 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -224,7 +224,7 @@ pub(crate) struct MagicSock { /// Nearest relay node ID; 0 means none/unknown. my_relay: Watchable>, /// Tracks the networkmap node entity for each node discovery key. - node_map: NodeMap, + node_map: Arc, /// Tracks the mapped IP addresses ip_mapped_addrs: IpMappedAddresses, /// NetReport client @@ -1790,7 +1790,7 @@ impl Handle { my_relay: Default::default(), net_reporter: net_reporter.addr(), disco_secrets: DiscoSecrets::default(), - node_map, + node_map: Arc::new(node_map), ip_mapped_addrs, udp_disco_sender, discovery, @@ -2184,7 +2184,7 @@ impl RelayDatagramRecvQueue { } impl AsyncUdpSocket for MagicSock { - fn create_io_poller(self: Arc) -> Pin> { + fn create_io_poller(self: Arc, remote: SocketAddr) -> Pin> { // To do this properly the MagicSock would need a registry of pollers. For each // node we would look up the poller or create one. Then on each try_send we can // look up the correct poller and configure it to poll the paths it needs. @@ -2205,6 +2205,11 @@ impl AsyncUdpSocket for MagicSock { let ipv6_poller = self.sockets.v6.as_ref().map(|sock| sock.create_io_poller()); let relay_sender = self.relay_datagram_send_channel.clone(); Box::pin(IoPoller { + mapped_addr: MappedAddr::from(remote), + node_map: self.node_map.clone(), + metrics: self.metrics.magicsock.clone(), + ip_mapped_addrs: self.ip_mapped_addrs.clone(), + ipv6_reported: self.ipv6_reported.clone(), #[cfg(not(wasm_browser))] ipv4_poller, #[cfg(not(wasm_browser))] @@ -2303,6 +2308,11 @@ impl AsyncUdpSocket for MagicSock { #[derive(Debug)] struct IoPoller { + mapped_addr: MappedAddr, + node_map: Arc, + metrics: Arc, + ip_mapped_addrs: IpMappedAddresses, + ipv6_reported: Arc, #[cfg(not(wasm_browser))] ipv4_poller: Pin>, #[cfg(not(wasm_browser))] @@ -2312,21 +2322,60 @@ struct IoPoller { impl quinn::UdpPoller for IoPoller { fn poll_writable(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // This version returns Ready as soon as any of them are ready. let this = &mut *self; - #[cfg(not(wasm_browser))] - match this.ipv4_poller.as_mut().poll_writable(cx) { - Poll::Ready(_) => return Poll::Ready(Ok(())), - Poll::Pending => (), - } - #[cfg(not(wasm_browser))] - if let Some(ref mut ipv6_poller) = this.ipv6_poller { - match ipv6_poller.as_mut().poll_writable(cx) { - Poll::Ready(_) => return Poll::Ready(Ok(())), - Poll::Pending => (), + let udp_addr = match &this.mapped_addr { + MappedAddr::None(dest) => { + error!(%dest, "Cannot convert to a mapped address, voiding transmit."); + // Because we can't send these, we just stall whatever endpoint driver got into this state + // TODO: Maybe we shoulde error out instead? Since there's no way this recovers, right? + return Poll::Pending; } - } - this.relay_sender.poll_writable(cx) + MappedAddr::NodeId(dest) => { + trace!(%dest, "polling writable"); + + // Get the node's relay address and best direct address, as well + // as any pings that need to be sent for hole-punching purposes. + match this + .node_map + .peek_addr(*dest, this.ipv6_reported.load(Ordering::Relaxed)) + { + Some((_, None, Some(_relay_url))) => { + return this.relay_sender.poll_writable(cx) + } + Some((_, Some(udp_addr), None)) => udp_addr, + Some((_, Some(udp_addr), Some(_relay_url))) => { + // If we're in mixed connection mode, then wait for anything to be ready. + if let Poll::Ready(r) = this.relay_sender.poll_writable(cx) { + return Poll::Ready(r); + } + udp_addr + } + _ => { + // TODO ensure this is correct + return Poll::Pending; + } + } + } + MappedAddr::Ip(mapped_addr) => { + let Some(udp_addr) = this.ip_mapped_addrs.get_ip_addr(mapped_addr) else { + // TODO idk + return Poll::Pending; + }; + udp_addr + } + }; + + let poller = match udp_addr { + SocketAddr::V4(_) => this.ipv4_poller.as_mut(), + SocketAddr::V6(_) => { + let Some(poller) = this.ipv6_poller.as_mut() else { + // TODO error? Trace? + return Poll::Pending; + }; + poller.as_mut() + } + }; + poller.poll_writable(cx) } } diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 5b239b0a38..c0f0045d12 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -2,7 +2,7 @@ use std::{ collections::{hash_map::Entry, BTreeSet, HashMap}, hash::Hash, net::{IpAddr, SocketAddr}, - sync::Mutex, + sync::RwLock, }; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; @@ -54,7 +54,7 @@ const MAX_INACTIVE_NODES: usize = 30; /// An index of nodeInfos by node key, NodeIdMappedAddr, and discovered ip:port endpoints. #[derive(Debug, Default)] pub(super) struct NodeMap { - inner: Mutex, + inner: RwLock, } #[derive(Default, Debug)] @@ -142,21 +142,21 @@ impl NodeMap { fn from_inner(inner: NodeMapInner) -> Self { Self { - inner: Mutex::new(inner), + inner: RwLock::new(inner), } } /// Add the contact information for a node. pub(super) fn add_node_addr(&self, node_addr: NodeAddr, source: Source, metrics: &Metrics) { self.inner - .lock() + .write() .expect("poisoned") .add_node_addr(node_addr, source, metrics) } /// Number of nodes currently listed. pub(super) fn node_count(&self) -> usize { - self.inner.lock().expect("poisoned").node_count() + self.inner.read().expect("poisoned").node_count() } #[cfg(not(wasm_browser))] @@ -164,12 +164,12 @@ impl NodeMap { &self, udp_addr: SocketAddr, ) -> Option<(PublicKey, NodeIdMappedAddr)> { - self.inner.lock().expect("poisoned").receive_udp(udp_addr) + self.inner.write().expect("poisoned").receive_udp(udp_addr) } pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> NodeIdMappedAddr { self.inner - .lock() + .write() .expect("poisoned") .receive_relay(relay_url, src) } @@ -184,7 +184,7 @@ impl NodeMap { ) { if let Some(ep) = self .inner - .lock() + .write() .expect("poisoned") .get_mut(NodeStateKey::Idx(id)) { @@ -195,7 +195,7 @@ impl NodeMap { pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun_rs::TransactionId) { if let Some(ep) = self .inner - .lock() + .write() .expect("poisoned") .get_mut(NodeStateKey::Idx(id)) { @@ -208,7 +208,7 @@ impl NodeMap { node_key: NodeId, ) -> Option { self.inner - .lock() + .read() .expect("poisoned") .get(NodeStateKey::NodeId(node_key)) .map(|ep| *ep.quic_mapped_addr()) @@ -223,14 +223,14 @@ impl NodeMap { tx_id: TransactionId, ) -> PingHandled { self.inner - .lock() + .write() .expect("poisoned") .handle_ping(sender, src, tx_id) } pub(super) fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { self.inner - .lock() + .write() .expect("poisoned") .handle_pong(sender, src, pong) } @@ -243,11 +243,40 @@ impl NodeMap { metrics: &Metrics, ) -> Vec { self.inner - .lock() + .write() .expect("poisoned") .handle_call_me_maybe(sender, cm, metrics) } + pub(super) fn addr_for_send( + &self, + addr: NodeIdMappedAddr, + have_ipv6: bool, + metrics: &Metrics, + ) -> Option<(PublicKey, Option, Option)> { + let mut inner = self.inner.write().expect("poisoned"); + let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?; + let public_key = *ep.public_key(); + trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); + let now = Instant::now(); + let (udp_addr, relay_url) = ep.addr_for_send(&now, have_ipv6, metrics); + Some((public_key, udp_addr, relay_url)) + } + + pub(super) fn peek_addr( + &self, + addr: NodeIdMappedAddr, + have_ipv6: bool, + ) -> Option<(PublicKey, Option, Option)> { + let inner = self.inner.read().expect("poisoned"); + let ep = inner.get(NodeStateKey::NodeIdMappedAddr(addr))?; + let public_key = *ep.public_key(); + trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); + let now = Instant::now(); + let (udp_addr, relay_url) = ep.peek_addr(&now, have_ipv6); + Some((public_key, udp_addr, relay_url)) + } + #[allow(clippy::type_complexity)] pub(super) fn get_send_addrs( &self, @@ -260,7 +289,7 @@ impl NodeMap { Option, Vec, )> { - let mut inner = self.inner.lock().expect("poisoned"); + let mut inner = self.inner.write().expect("poisoned"); let ep = inner.get_mut(NodeStateKey::NodeIdMappedAddr(addr))?; let public_key = *ep.public_key(); trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); @@ -269,21 +298,21 @@ impl NodeMap { } pub(super) fn notify_shutdown(&self) { - let mut inner = self.inner.lock().expect("poisoned"); + let mut inner = self.inner.write().expect("poisoned"); for (_, ep) in inner.node_states_mut() { ep.reset(); } } pub(super) fn reset_node_states(&self) { - let mut inner = self.inner.lock().expect("poisoned"); + let mut inner = self.inner.write().expect("poisoned"); for (_, ep) in inner.node_states_mut() { ep.note_connectivity_change(); } } pub(super) fn nodes_stayin_alive(&self) -> Vec { - let mut inner = self.inner.lock().expect("poisoned"); + let mut inner = self.inner.write().expect("poisoned"); inner .node_states_mut() .flat_map(|(_idx, node_state)| node_state.stayin_alive()) @@ -297,7 +326,7 @@ impl NodeMap { // we were to find this acceptable, dealing with the lifetimes of the mutex's guard and the // internal iterator will be a hassle, if possible at all. self.inner - .lock() + .write() .expect("poisoned") .remote_infos_iter(now) .collect() @@ -315,22 +344,22 @@ impl NodeMap { &self, node_id: NodeId, ) -> anyhow::Result> { - self.inner.lock().expect("poisoned").conn_type(node_id) + self.inner.write().expect("poisoned").conn_type(node_id) } /// Get the [`RemoteInfo`]s for the node identified by [`NodeId`]. pub(super) fn remote_info(&self, node_id: NodeId) -> Option { - self.inner.lock().expect("poisoned").remote_info(node_id) + self.inner.write().expect("poisoned").remote_info(node_id) } /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept. pub(super) fn prune_inactive(&self) { - self.inner.lock().expect("poisoned").prune_inactive(); + self.inner.write().expect("poisoned").prune_inactive(); } pub(crate) fn on_direct_addr_discovered(&self, discovered: BTreeSet) { self.inner - .lock() + .write() .expect("poisoned") .on_direct_addr_discovered(discovered); } @@ -809,7 +838,7 @@ mod tests { let public_key = SecretKey::generate(rand::thread_rng()).public(); let id = node_map .inner - .lock() + .write() .unwrap() .insert_node(Options { node_id: public_key, @@ -834,7 +863,7 @@ mod tests { // add address node_map.add_test_addr(node_addr); // make it active - node_map.inner.lock().unwrap().receive_udp(addr); + node_map.inner.write().unwrap().receive_udp(addr); } info!("Adding offline/inactive addresses"); @@ -844,7 +873,7 @@ mod tests { node_map.add_test_addr(node_addr); } - let mut node_map_inner = node_map.inner.lock().unwrap(); + let mut node_map_inner = node_map.inner.write().unwrap(); let endpoint = node_map_inner.by_id.get_mut(&id).unwrap(); info!("Adding alive addresses"); @@ -885,7 +914,7 @@ mod tests { node_map.add_test_addr(NodeAddr::new(active_node).with_direct_addresses([addr])); node_map .inner - .lock() + .write() .unwrap() .receive_udp(addr) .expect("registered"); @@ -900,7 +929,7 @@ mod tests { assert_eq!(node_map.node_count(), MAX_INACTIVE_NODES + 1); node_map .inner - .lock() + .write() .unwrap() .get(NodeStateKey::NodeId(active_node)) .expect("should not be pruned"); diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index b9c80198cb..1e7c0d35a1 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -277,7 +277,7 @@ impl NodeState { /// Returns the address(es) that should be used for sending the next packet. /// /// This may return to send on one, both or no paths. - fn addr_for_send( + pub(super) fn addr_for_send( &mut self, now: &Instant, have_ipv6: bool, @@ -362,6 +362,40 @@ impl NodeState { (best_addr, relay_url) } + pub(super) fn peek_addr( + &self, + now: &Instant, + have_ipv6: bool, + ) -> (Option, Option) { + #[cfg(any(test, feature = "test-utils"))] + if self.path_selection == PathSelection::RelayOnly { + debug!("in `RelayOnly` mode, giving the relay address as the only viable address for this endpoint"); + return (None, self.relay_url()); + } + match self.udp_paths.peek_addr(*now, have_ipv6) { + UdpSendAddr::Valid(addr) => { + // If we have a valid address we use it. + trace!(%addr, "UdpSendAddr is valid, use it"); + (Some(addr), None) + } + UdpSendAddr::Outdated(addr) => { + // If the address is outdated we use it, but send via relay at the same time. + // We also send disco pings so that it will become valid again if it still + // works (i.e. we don't need to holepunch again). + trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); + (Some(addr), self.relay_url()) + } + UdpSendAddr::Unconfirmed(addr) => { + trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); + (Some(addr), self.relay_url()) + } + UdpSendAddr::None => { + trace!("No UdpSendAddr, use relay"); + (None, self.relay_url()) + } + } + } + /// Removes a direct address for this node. /// /// If this is also the best address, it will be cleared as well. diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 0d96c3bba0..9de41ab8ec 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -134,6 +134,42 @@ impl NodeUdpPaths { } } + pub(super) fn peek_addr(&self, now: Instant, have_ipv6: bool) -> UdpSendAddr { + match self.best_addr.state(now) { + best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), + best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), + best_addr::State::Empty => { + // No direct connection has been used before. If we know of any possible + // candidate addresses, randomly try to use one. This path is most + // effective when folks use a NodeAddr with exactly one direct address which + // they know to work, effectively like using a traditional socket or QUIC + // endpoint. + let addr = self + .chosen_candidate + .and_then(|ipp| self.paths.get(&ipp)) + .and_then(|path| path.udp_addr()) + .filter(|addr| addr.is_ipv4() || have_ipv6); + // .or_else(|| { + // // Look for a new candidate in all the known paths. This may look + // // like a RNG use on the hot-path but this is normally invoked at + // // most most once at startup. + // let addr = self + // .paths + // .values() + // .filter_map(|path| path.udp_addr()) + // .filter(|addr| addr.is_ipv4() || have_ipv6) + // .choose(&mut rand::thread_rng()); + // self.chosen_candidate = addr.map(IpPort::from); + // addr + // }); + match addr { + Some(addr) => UdpSendAddr::Unconfirmed(addr), + None => UdpSendAddr::None, + } + } + } + } + /// Fixup best_addr from candidates. /// /// If somehow we end up in a state where we failed to set a best_addr, while we do have diff --git a/iroh/src/magicsock/udp_conn.rs b/iroh/src/magicsock/udp_conn.rs index 5c61812150..23db4d8bf4 100644 --- a/iroh/src/magicsock/udp_conn.rs +++ b/iroh/src/magicsock/udp_conn.rs @@ -34,7 +34,7 @@ impl UdpConn { } impl AsyncUdpSocket for UdpConn { - fn create_io_poller(self: Arc) -> Pin> { + fn create_io_poller(self: Arc, _remote: SocketAddr) -> Pin> { (*self).create_io_poller() }