diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index ca4ea755ed..79998f993e 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -45,7 +45,7 @@ type HyperHandler = Box< + 'static, >; -/// WebSocket GUID needed for accepting websocket connections, see RFC 6455 (https://www.rfc-editor.org/rfc/rfc6455) section 1.3 +/// WebSocket GUID needed for accepting websocket connections, see RFC 6455 () section 1.3 const SEC_WEBSOCKET_ACCEPT_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; /// Derives the accept key for WebSocket handshake according to RFC 6455. diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index bc86654019..df97a5d13a 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -41,7 +41,7 @@ use crate::{ }, magicsock::{self, Handle, NodeIdMappedAddr}, metrics::EndpointMetrics, - net_report::Report, + net_report::{NetReporter, Report}, tls, watcher::{self, Watcher}, RelayProtocol, @@ -1051,6 +1051,13 @@ impl Endpoint { self.msock.net_report() } + /// Run a diagnsotic net report that ensures all probe protocols get + /// reported at least once + #[doc(hidden)] + pub async fn run_diagnostic_net_report(&self) -> Result { + self.msock.run_diagnostic_net_report().await + } + /// Returns the local socket addresses on which the underlying sockets are bound. /// /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6 diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 9cda164c82..c29f8be9a7 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -48,7 +48,7 @@ use quinn::{AsyncUdpSocket, ServerConfig}; use rand::{seq::SliceRandom, Rng, SeedableRng}; use relay_actor::RelaySendItem; use smallvec::{smallvec, SmallVec}; -use tokio::sync::{self, mpsc, Mutex}; +use tokio::sync::{self, mpsc, oneshot, Mutex}; use tokio_util::sync::CancellationToken; use tracing::{ debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn, @@ -75,8 +75,8 @@ use crate::{ discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData}, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, metrics::EndpointMetrics, - net_report::{self, IpMappedAddresses, Report}, - watcher::{self, Watchable}, + net_report::{self, IpMappedAddresses, NetReporter, Report}, + watcher::{self, Watchable, Watcher}, }; mod metrics; @@ -1587,6 +1587,26 @@ impl MagicSock { discovery.publish(&data); } } + + /// Run a net-report, outside of the usual net-report cycle. This is for + /// diagnostic purposes only, and will not effect the usual net-report + /// run cycle nor adjust the + async fn run_diagnostic_net_report(&self) -> Result>> { + let is_running = net_report::Client::is_running(&self.net_reporter).await?; + // wait for any current runs to complete + // before requesting a run + let mut is_running = is_running.stream(); + while let Some(is_running) = is_running.next().await { + if is_running { + continue; + } + break; + } + let (tx, rx) = oneshot::channel(); + let msg = ActorMessage::DiagnosticNetReport(tx); + self.actor_sender.send(msg).await?; + Ok(rx) + } } #[derive(Clone, Debug)] @@ -1985,6 +2005,24 @@ impl Handle { } trace!("magicsock closed"); } + + /// Run a diagnosic net-report check, that waits for all + /// probes to return before ending the run (within the + /// timeout). + /// + /// This does not interfere with the normal net-report run + /// and does not update the known public addresses or + /// adjust the known latency for any relay nodes, it is + /// strictly for diagnostic purposes. + /// + /// Return a [`NetReporter`], allowing you to iterate + /// over all of the returned probes using `.next()`, or + /// you can just `.await` the [`NetReporter`] to get + /// the [`Report`]. + pub(crate) async fn run_diagnostic_net_report(&self) -> Result { + let rx = self.msock.run_diagnostic_net_report().await?; + rx.await? + } } #[derive(Debug, Default)] @@ -2335,6 +2373,7 @@ enum ActorMessage { Shutdown, EndpointPingExpired(usize, stun_rs::TransactionId), NetReport(Result>>, &'static str), + DiagnosticNetReport(oneshot::Sender>), NetworkChange, #[cfg(test)] ForceNetworkChange(bool), @@ -2676,6 +2715,9 @@ impl Actor { ActorMessage::ForceNetworkChange(is_major) => { self.handle_network_change(is_major).await; } + ActorMessage::DiagnosticNetReport(tx) => { + self.net_report_diagnostic(tx).await; + } } false @@ -2876,6 +2918,39 @@ impl Actor { self.net_info_last = Some(ni); } + /// User requested a full diagnosic run of net-report, outside + /// of the normal net report cycle. + #[instrument(level = "debug", skip_all)] + async fn net_report_diagnostic(&mut self, tx: oneshot::Sender>) { + // Don't start a net report probe if we know + // we are shutting down + if self.msock.is_closing() || self.msock.is_closed() { + tx.send(Err(anyhow!( + "magicsock is closed, cancelling net-report diagnostic" + ))) + .ok(); + return; + } + if self.msock.relay_map.is_empty() { + tx.send(Err(anyhow!( + "no relay nodes, cancelling net-report diagnostic" + ))) + .ok(); + return; + } + let relay_map = self.msock.relay_map.clone(); + + #[cfg(wasm_browser)] + let opts = self.net_report_config.clone(); + // run a non-sparse report, meaning the report will ensure + // that each probe protocol response is received for each relay + // before finishing + #[cfg(not(wasm_browser))] + let opts = self.net_report_config.clone().sparse(false); + let res = self.net_reporter.get_report_channel(relay_map, opts).await; + tx.send(res).ok(); + } + /// Calls net_report. /// /// Note that invoking this is managed by [`DirectAddrUpdateState`] via @@ -2908,9 +2983,8 @@ impl Actor { task::spawn(async move { let report = time::timeout(NET_REPORT_TIMEOUT, rx).await; let report: anyhow::Result<_> = match report { - Ok(Ok(Ok(report))) => Ok(Some(report)), - Ok(Ok(Err(err))) => Err(err), - Ok(Err(_)) => Err(anyhow!("net_report report not received")), + Ok(Ok(report)) => Ok(Some(report)), + Ok(Err(err)) => Err(err), Err(err) => Err(anyhow!("net_report report timeout: {:?}", err)), }; msg_sender @@ -3456,9 +3530,7 @@ mod tests { use crate::{ defaults::staging::{self, EU_RELAY_HOSTNAME}, dns::DnsResolver, - tls, - watcher::Watcher as _, - Endpoint, RelayMode, + tls, Endpoint, RelayMode, }; const ALPN: &[u8] = b"n0/test/1"; @@ -4487,4 +4559,28 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn multiple_net_report_runs() -> Result<()> { + let stack = MagicStack::new(RelayMode::Default).await?; + let ep = stack.endpoint; + println!("running multiple net-report checks at once"); + let mut reporter0 = ep.magic_sock().run_diagnostic_net_report().await?; + let mut reporter1 = ep.magic_sock().run_diagnostic_net_report().await?; + let mut set = JoinSet::new(); + set.spawn(async move { + while let Some(probe_result) = reporter0.next().await { + println!("probe from reporter0: {probe_result}"); + } + println!("probe report from reporter0: {:?}", reporter0.await); + }); + set.spawn(async move { + while let Some(probe_result) = reporter1.next().await { + println!("probe from reporter1: {probe_result}"); + } + println!("probe report from reporter1: {:?}", reporter1.await); + }); + set.join_all().await; + Ok(()) + } } diff --git a/iroh/src/net_report.rs b/iroh/src/net_report.rs index aeae00fae1..64b8ce601f 100644 --- a/iroh/src/net_report.rs +++ b/iroh/src/net_report.rs @@ -26,9 +26,11 @@ use iroh_relay::{protos::stun, RelayMap}; use n0_future::{ task::{self, AbortOnDropHandle}, time::{Duration, Instant}, + FutureExt, }; #[cfg(not(wasm_browser))] use netwatch::UdpSocket; +use reportgen::ProbeProto; use tokio::sync::{self, mpsc, oneshot}; use tracing::{debug, error, info_span, trace, warn, Instrument}; @@ -43,6 +45,7 @@ mod reportgen; mod options; +pub use reportgen::ProbeReport; #[cfg(not(wasm_browser))] pub use stun_utils::bind_local_stun_socket; @@ -72,6 +75,8 @@ pub use reportgen::QuicConfig; #[cfg(not(wasm_browser))] use reportgen::SocketState; +use crate::watcher::{self, Watchable}; + const FULL_REPORT_INTERVAL: Duration = Duration::from_secs(5 * 60); /// The maximum latency of all nodes, if none are found yet. @@ -315,10 +320,7 @@ impl Client { #[cfg(wasm_browser)] let opts = Options::default(); let rx = self.get_report_channel(relay_map.clone(), opts).await?; - match rx.await { - Ok(res) => res, - Err(_) => Err(anyhow!("channel closed, actor awol")), - } + rx.await } /// Runs a net_report, returning the report. @@ -328,10 +330,7 @@ impl Client { /// Look at [`Options`] for the different configuration options. pub async fn get_report(&mut self, relay_map: RelayMap, opts: Options) -> Result> { let rx = self.get_report_channel(relay_map, opts).await?; - match rx.await { - Ok(res) => res, - Err(_) => Err(anyhow!("channel closed, actor awol")), - } + rx.await } /// Get report with channel @@ -341,16 +340,115 @@ impl Client { &mut self, relay_map: RelayMap, opts: Options, - ) -> Result>>> { - let (tx, rx) = oneshot::channel(); + ) -> Result { + let (reporter, sender) = NetReporter::new(); self.addr .send(Message::RunCheck { relay_map, opts, - response_tx: tx, + response_tx: sender, }) .await?; - Ok(rx) + Ok(reporter) + } + + /// Watch if a report is currently running. + /// + /// Returns a `Watcher`. If the content is + /// `true`, a report is currently running. If + /// `false`, no report is currentnly running. + pub async fn is_running(addr: &Addr) -> Result> { + let (tx, rx) = oneshot::channel(); + addr.send(Message::TrackRuns(tx)).await?; + let is_running = rx.await?; + Ok(is_running) + } +} + +/// ProbeResult +#[derive(Debug)] +pub struct ProbeResult { + proto: ProbeProto, + relay_url: RelayUrl, + addr: Option, + latency: Option, +} + +impl From for ProbeResult { + fn from(value: ProbeReport) -> Self { + Self { + proto: value.proto(), + relay_url: value.relay_url(), + addr: value.addr, + latency: value.latency, + } + } +} + +impl fmt::Display for ProbeResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut s = format!("{} probe to {}", self.proto, self.relay_url); + if let Some(latency) = self.latency { + s.push_str(&format!(" successful ({:?})", latency)); + } else { + s.push_str(" unsuccessful"); + } + + if let Some(addr) = self.addr { + s.push_str(&format!(", public address found: {}", addr)); + } + write!(f, "{s}") + } +} + +/// NetReporter allows you to track and receive information about the probes +/// and report in the current net-report run. +/// +/// ProbeReports may be dropped if the NetReporter is not polled. +#[derive(Debug)] +pub struct NetReporter { + recv: mpsc::Receiver, + report: oneshot::Receiver>>, +} + +#[derive(Debug)] +pub(crate) struct ProbeSender { + send: mpsc::Sender, + report: oneshot::Sender>>, +} + +impl NetReporter { + fn new() -> (Self, ProbeSender) { + let (send, recv) = mpsc::channel(10); + let (tx, rx) = oneshot::channel(); + (Self { recv, report: rx }, ProbeSender { send, report: tx }) + } +} + +impl n0_future::Stream for NetReporter { + type Item = ProbeResult; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.recv.poll_recv(cx) + } +} + +impl n0_future::Future for NetReporter { + type Output = Result>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.report.poll(cx) { + std::task::Poll::Ready(Ok(result)) => std::task::Poll::Ready(result), + std::task::Poll::Ready(Err(err)) => { + std::task::Poll::Ready(Err(anyhow::Error::from(err))) + } + std::task::Poll::Pending => std::task::Poll::Pending, + } } } @@ -377,7 +475,7 @@ pub(crate) enum Message { /// Options for the report opts: Options, /// Channel to receive the response. - response_tx: oneshot::Sender>>, + response_tx: ProbeSender, }, /// A report produced by the [`reportgen`] actor. ReportReady { report: Box }, @@ -395,6 +493,8 @@ pub(crate) enum Message { /// The sender is signalled once the STUN packet is registered with the actor and will /// correctly accept the STUN response. InFlightStun(Inflight, oneshot::Sender<()>), + /// Track whether a report is running. + TrackRuns(oneshot::Sender>), } /// Sender to the main service. @@ -481,6 +581,9 @@ struct Actor { #[cfg(not(wasm_browser))] ip_mapped_addrs: Option, metrics: Arc, + + /// Is a report currently running? + is_running: Watchable, } impl Actor { @@ -509,6 +612,7 @@ impl Actor { #[cfg(not(wasm_browser))] ip_mapped_addrs, metrics, + is_running: Watchable::new(false), }) } @@ -548,6 +652,9 @@ impl Actor { Message::InFlightStun(inflight, response_tx) => { self.handle_in_flight_stun(inflight, response_tx); } + Message::TrackRuns(rx) => { + rx.send(self.is_running.watch()).ok(); + } } } } @@ -557,12 +664,7 @@ impl Actor { /// If *stun_sock_v4* or *stun_sock_v6* are not provided this will bind the sockets /// itself. This is not ideal since really you want to send STUN probes from the /// sockets you will be using. - fn handle_run_check( - &mut self, - relay_map: RelayMap, - opts: Options, - response_tx: oneshot::Sender>>, - ) { + fn handle_run_check(&mut self, relay_map: RelayMap, opts: Options, response_tx: ProbeSender) { let protocols = opts.to_protocols(); #[cfg(not(wasm_browser))] let socket_state = SocketState { @@ -574,8 +676,9 @@ impl Actor { ip_mapped_addrs: self.ip_mapped_addrs.clone(), }; trace!("Attempting probes for protocols {protocols:#?}"); - if self.current_report_run.is_some() { + if self.is_running.get() { response_tx + .report .send(Err(anyhow!( "ignoring RunCheck request: reportgen actor already running" ))) @@ -583,6 +686,8 @@ impl Actor { return; } + self.is_running.set(true).expect("checked"); + let now = Instant::now(); let mut do_full = self.reports.next_full @@ -604,6 +709,11 @@ impl Actor { } self.metrics.reports.inc(); + let ProbeSender { + report: report_tx, + send: probes_tx, + } = response_tx; + let actor = reportgen::Client::new( self.addr(), self.reports.last.clone(), @@ -612,11 +722,13 @@ impl Actor { self.metrics.clone(), #[cfg(not(wasm_browser))] socket_state, + probes_tx, + opts.sparse, ); self.current_report_run = Some(ReportRun { _reportgen: actor, - report_tx: response_tx, + report_tx, }); } @@ -626,6 +738,7 @@ impl Actor { if let Some(ReportRun { report_tx, .. }) = self.current_report_run.take() { report_tx.send(Ok(report)).ok(); } + self.is_running.set(false).ok(); } fn handle_report_aborted(&mut self, err: anyhow::Error) { @@ -633,6 +746,7 @@ impl Actor { if let Some(ReportRun { report_tx, .. }) = self.current_report_run.take() { report_tx.send(Err(err.context("report aborted"))).ok(); } + self.is_running.set(false).ok(); } /// Handles [`Message::StunPacket`]. diff --git a/iroh/src/net_report/options.rs b/iroh/src/net_report/options.rs index ba55a61453..2ac733d432 100644 --- a/iroh/src/net_report/options.rs +++ b/iroh/src/net_report/options.rs @@ -50,6 +50,15 @@ mod imp { /// /// On by default pub(crate) https: bool, + /// Run probes until enough probes have + /// been returned to generate a report, + /// and no longer. + /// + /// When `false`, the report will run + /// until all probe/relay node + /// combinations have been returned, or + /// timeout occurs + pub(crate) sparse: bool, } impl Default for Options { @@ -61,6 +70,7 @@ mod imp { icmp_v4: true, icmp_v6: true, https: true, + sparse: true, } } } @@ -75,6 +85,7 @@ mod imp { icmp_v4: false, icmp_v6: false, https: false, + sparse: true, } } @@ -114,6 +125,20 @@ mod imp { self } + /// When `true`, the report will run until + /// enough probes have returned to generate + /// a full report, or timeout occurs. + /// + /// When `false`, the report will run until + /// all probe protocol/relay node combinations + /// have been returned, or timeout occurs. + /// + /// The default is `true`. + pub fn sparse(mut self, sparse: bool) -> Self { + self.sparse = sparse; + self + } + /// Turn the options into set of valid protocols pub(crate) fn to_protocols(&self) -> BTreeSet { let mut protocols = BTreeSet::new(); diff --git a/iroh/src/net_report/reportgen.rs b/iroh/src/net_report/reportgen.rs index ba27d9ea88..d77945e8e6 100644 --- a/iroh/src/net_report/reportgen.rs +++ b/iroh/src/net_report/reportgen.rs @@ -67,6 +67,7 @@ mod probes; pub use probes::ProbeProto; use probes::{Probe, ProbePlan}; +use super::ProbeResult; use crate::net_report::defaults::timeouts::{ CAPTIVE_PORTAL_DELAY, CAPTIVE_PORTAL_TIMEOUT, OVERALL_REPORT_TIMEOUT, PROBES_TIMEOUT, }; @@ -107,6 +108,14 @@ impl Client { /// /// The actor starts running immediately and only generates a single report, after which /// it shuts down. Dropping this handle will abort the actor. + /// + /// When `sparse` is `true`, the report will run until it + /// has processed enough probes to return a + /// full report. + /// + /// When `false`, the report will ensure that one of each probe protocol/relay + /// combination is processed (within the timeout). + #[allow(clippy::too_many_arguments)] pub(super) fn new( net_report: net_report::Addr, last_report: Option>, @@ -114,6 +123,8 @@ impl Client { protocols: BTreeSet, metrics: Arc, #[cfg(not(wasm_browser))] socket_state: SocketState, + probes_tx: mpsc::Sender, + sparse: bool, ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(32); let addr = Addr { @@ -128,6 +139,8 @@ impl Client { report: Report::default(), outstanding_tasks: OutstandingTasks::default(), protocols, + reported_probes: (!sparse).then_some(Default::default()), + probes_tx, #[cfg(not(wasm_browser))] socket_state, #[cfg(not(wasm_browser))] @@ -206,6 +219,10 @@ struct Actor { /// Protocols we should attempt to create probes for, if we have the correct /// configuration for that protocol. protocols: BTreeSet, + /// When running a "full" report, a report should try to get one response from each protocol, per relay URL, before returning. + reported_probes: Option>, + /// Sender to send `ProbeReport`s + probes_tx: mpsc::Sender, /// Any socket-related state that doesn't exist/work in browsers #[cfg(not(wasm_browser))] @@ -373,6 +390,20 @@ impl Actor { fn handle_probe_report(&mut self, probe_report: ProbeReport) { debug!(?probe_report, "finished probe"); + if let Err(e) = self.probes_tx.try_send(probe_report.clone().into()) { + warn!( + "unable to send the probe report for probe {} to + {} to the probe sender: {e:?}", + probe_report.probe.proto(), + probe_report.probe.node().url + ); + } + if let Some(reported_probes) = self.reported_probes.as_mut() { + reported_probes.insert(( + probe_report.probe.proto(), + probe_report.probe.node().url.clone(), + )); + } update_report(&mut self.report, probe_report); // When we discover the first IPv4 address we want to start the hairpin actor. @@ -419,6 +450,14 @@ impl Actor { /// Whether running this probe would still improve our report. fn probe_would_help(&mut self, probe: Probe, relay_node: Arc) -> bool { + // If we want to do a "full" probe, and don't yet have a probe report for this + // probe protocol / relay node combination, that would help. + if let Some(reported_probes) = self.reported_probes.as_ref() { + if reported_probes.contains(&(probe.proto(), relay_node.url.clone())) { + return true; + } + } + // If the probe is for a relay we don't yet know about, that would help. if self.report.relay_latency.get(&relay_node.url).is_none() { return true; @@ -686,23 +725,23 @@ impl OutstandingTasks { } } -/// The success result of [`run_probe`]. +/// The success result of `run_probe`. #[derive(Debug, Clone)] -struct ProbeReport { +pub struct ProbeReport { /// Whether we can send IPv4 UDP packets. - ipv4_can_send: bool, + pub(crate) ipv4_can_send: bool, /// Whether we can send IPv6 UDP packets. - ipv6_can_send: bool, + pub(crate) ipv6_can_send: bool, /// Whether we can send ICMPv4 packets, `None` if not checked. - icmpv4: Option, + pub(crate) icmpv4: Option, /// Whether we can send ICMPv6 packets, `None` if not checked. - icmpv6: Option, + pub(crate) icmpv6: Option, /// The latency to the relay node. - latency: Option, + pub(crate) latency: Option, /// The probe that generated this report. probe: Probe, /// The discovered public address. - addr: Option, + pub(crate) addr: Option, } impl ProbeReport { @@ -717,6 +756,16 @@ impl ProbeReport { addr: None, } } + + /// Returns the protocol used for this probe. + pub fn proto(&self) -> ProbeProto { + self.probe.proto() + } + + /// Returns the relay url of the relay this probe is meant for. + pub fn relay_url(&self) -> RelayUrl { + self.probe.node().url.clone() + } } /// Errors for [`run_probe`].