Skip to content

WIP: feat(iroh): run_diagnostic_net_report runs a net-report and returns the completed probes #3311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-relay/src/server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type HyperHandler = Box<
+ 'static,
>;

/// WebSocket GUID needed for accepting websocket connections, see RFC 6455 (https://www.rfc-editor.org/rfc/rfc6455) section 1.3
/// WebSocket GUID needed for accepting websocket connections, see RFC 6455 (<https://www.rfc-editor.org/rfc/rfc6455>) section 1.3
const SEC_WEBSOCKET_ACCEPT_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

/// Derives the accept key for WebSocket handshake according to RFC 6455.
Expand Down
9 changes: 8 additions & 1 deletion iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
magicsock::{self, Handle, NodeIdMappedAddr},
metrics::EndpointMetrics,
net_report::Report,
net_report::{NetReporter, Report},
tls,
watcher::{self, Watcher},
RelayProtocol,
Expand Down Expand Up @@ -1051,6 +1051,13 @@ impl Endpoint {
self.msock.net_report()
}

/// Run a diagnsotic net report that ensures all probe protocols get
/// reported at least once
#[doc(hidden)]
pub async fn run_diagnostic_net_report(&self) -> Result<NetReporter> {
self.msock.run_diagnostic_net_report().await
}

/// Returns the local socket addresses on which the underlying sockets are bound.
///
/// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
Expand Down
114 changes: 105 additions & 9 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use quinn::{AsyncUdpSocket, ServerConfig};
use rand::{seq::SliceRandom, Rng, SeedableRng};
use relay_actor::RelaySendItem;
use smallvec::{smallvec, SmallVec};
use tokio::sync::{self, mpsc, Mutex};
use tokio::sync::{self, mpsc, oneshot, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{
debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn,
Expand All @@ -75,8 +75,8 @@ use crate::{
discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData},
key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret},
metrics::EndpointMetrics,
net_report::{self, IpMappedAddresses, Report},
watcher::{self, Watchable},
net_report::{self, IpMappedAddresses, NetReporter, Report},
watcher::{self, Watchable, Watcher},
};

mod metrics;
Expand Down Expand Up @@ -1587,6 +1587,26 @@ impl MagicSock {
discovery.publish(&data);
}
}

/// Run a net-report, outside of the usual net-report cycle. This is for
/// diagnostic purposes only, and will not effect the usual net-report
/// run cycle nor adjust the
async fn run_diagnostic_net_report(&self) -> Result<oneshot::Receiver<Result<NetReporter>>> {
let is_running = net_report::Client::is_running(&self.net_reporter).await?;
// wait for any current runs to complete
// before requesting a run
let mut is_running = is_running.stream();
while let Some(is_running) = is_running.next().await {
if is_running {
continue;
}
break;
}
let (tx, rx) = oneshot::channel();
let msg = ActorMessage::DiagnosticNetReport(tx);
self.actor_sender.send(msg).await?;
Ok(rx)
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -1985,6 +2005,24 @@ impl Handle {
}
trace!("magicsock closed");
}

/// Run a diagnosic net-report check, that waits for all
/// probes to return before ending the run (within the
/// timeout).
///
/// This does not interfere with the normal net-report run
/// and does not update the known public addresses or
/// adjust the known latency for any relay nodes, it is
/// strictly for diagnostic purposes.
///
/// Return a [`NetReporter`], allowing you to iterate
/// over all of the returned probes using `.next()`, or
/// you can just `.await` the [`NetReporter`] to get
/// the [`Report`].
pub(crate) async fn run_diagnostic_net_report(&self) -> Result<NetReporter> {
let rx = self.msock.run_diagnostic_net_report().await?;
rx.await?
}
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -2335,6 +2373,7 @@ enum ActorMessage {
Shutdown,
EndpointPingExpired(usize, stun_rs::TransactionId),
NetReport(Result<Option<Arc<net_report::Report>>>, &'static str),
DiagnosticNetReport(oneshot::Sender<anyhow::Result<NetReporter>>),
NetworkChange,
#[cfg(test)]
ForceNetworkChange(bool),
Expand Down Expand Up @@ -2676,6 +2715,9 @@ impl Actor {
ActorMessage::ForceNetworkChange(is_major) => {
self.handle_network_change(is_major).await;
}
ActorMessage::DiagnosticNetReport(tx) => {
self.net_report_diagnostic(tx).await;
}
}

false
Expand Down Expand Up @@ -2876,6 +2918,39 @@ impl Actor {
self.net_info_last = Some(ni);
}

/// User requested a full diagnosic run of net-report, outside
/// of the normal net report cycle.
#[instrument(level = "debug", skip_all)]
async fn net_report_diagnostic(&mut self, tx: oneshot::Sender<Result<NetReporter>>) {
// Don't start a net report probe if we know
// we are shutting down
if self.msock.is_closing() || self.msock.is_closed() {
tx.send(Err(anyhow!(
"magicsock is closed, cancelling net-report diagnostic"
)))
.ok();
return;
}
if self.msock.relay_map.is_empty() {
tx.send(Err(anyhow!(
"no relay nodes, cancelling net-report diagnostic"
)))
.ok();
return;
}
let relay_map = self.msock.relay_map.clone();

#[cfg(wasm_browser)]
let opts = self.net_report_config.clone();
// run a non-sparse report, meaning the report will ensure
// that each probe protocol response is received for each relay
// before finishing
#[cfg(not(wasm_browser))]
let opts = self.net_report_config.clone().sparse(false);
let res = self.net_reporter.get_report_channel(relay_map, opts).await;
tx.send(res).ok();
}

/// Calls net_report.
///
/// Note that invoking this is managed by [`DirectAddrUpdateState`] via
Expand Down Expand Up @@ -2908,9 +2983,8 @@ impl Actor {
task::spawn(async move {
let report = time::timeout(NET_REPORT_TIMEOUT, rx).await;
let report: anyhow::Result<_> = match report {
Ok(Ok(Ok(report))) => Ok(Some(report)),
Ok(Ok(Err(err))) => Err(err),
Ok(Err(_)) => Err(anyhow!("net_report report not received")),
Ok(Ok(report)) => Ok(Some(report)),
Ok(Err(err)) => Err(err),
Err(err) => Err(anyhow!("net_report report timeout: {:?}", err)),
};
msg_sender
Expand Down Expand Up @@ -3456,9 +3530,7 @@ mod tests {
use crate::{
defaults::staging::{self, EU_RELAY_HOSTNAME},
dns::DnsResolver,
tls,
watcher::Watcher as _,
Endpoint, RelayMode,
tls, Endpoint, RelayMode,
};

const ALPN: &[u8] = b"n0/test/1";
Expand Down Expand Up @@ -4487,4 +4559,28 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn multiple_net_report_runs() -> Result<()> {
let stack = MagicStack::new(RelayMode::Default).await?;
let ep = stack.endpoint;
println!("running multiple net-report checks at once");
let mut reporter0 = ep.magic_sock().run_diagnostic_net_report().await?;
let mut reporter1 = ep.magic_sock().run_diagnostic_net_report().await?;
let mut set = JoinSet::new();
set.spawn(async move {
while let Some(probe_result) = reporter0.next().await {
println!("probe from reporter0: {probe_result}");
}
println!("probe report from reporter0: {:?}", reporter0.await);
});
set.spawn(async move {
while let Some(probe_result) = reporter1.next().await {
println!("probe from reporter1: {probe_result}");
}
println!("probe report from reporter1: {:?}", reporter1.await);
});
set.join_all().await;
Ok(())
}
}
Loading
Loading