diff --git a/Cargo.lock b/Cargo.lock index b9250e5..476a4e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,13 +1103,13 @@ dependencies = [ [[package]] name = "n0-watcher" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "813443be5d70d9ed05ad04577f0db9fa1d655336911d2ac271b63eb2da363ea1" +checksum = "f216d4ebc5fcf9548244803cbb93f488a2ae160feba3706cd17040d69cf7a368" dependencies = [ "derive_more", "n0-future", - "thiserror 2.0.12", + "snafu", ] [[package]] @@ -1239,6 +1239,7 @@ dependencies = [ "netlink-packet-route 0.23.0", "netlink-proto", "netlink-sys", + "pin-project-lite", "serde", "snafu", "socket2", @@ -1724,18 +1725,18 @@ checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "snafu" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" +checksum = "320b01e011bf8d5d7a4a4a4be966d9160968935849c83b918827f6a435e7f627" dependencies = [ "snafu-derive", ] [[package]] name = "snafu-derive" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" +checksum = "1961e2ef424c1424204d3a5d6975f934f56b6d50ff5732382d84ebf460e147f7" dependencies = [ "heck", "proc-macro2", diff --git a/netwatch/Cargo.toml b/netwatch/Cargo.toml index 5c3cd6a..3ec833f 100644 --- a/netwatch/Cargo.toml +++ b/netwatch/Cargo.toml @@ -19,8 +19,9 @@ workspace = true atomic-waker = "1.1.2" bytes = "1.7" n0-future = "0.1.3" -n0-watcher = "0.1" +n0-watcher = "0.2" nested_enum_utils = "0.2.0" +pin-project-lite = "0.2.16" snafu = "0.8.5" time = "0.3.20" tokio = { version = "1", features = [ diff --git a/netwatch/src/interfaces.rs b/netwatch/src/interfaces.rs index d4a8ed2..288d5cb 100644 --- a/netwatch/src/interfaces.rs +++ b/netwatch/src/interfaces.rs @@ -258,15 +258,6 @@ impl State { } /// Is this a major change compared to the `old` one?. - #[cfg(wasm_browser)] - pub fn is_major_change(&self, old: &State) -> bool { - // All changes are major. - // In the browser, there only are changes from online to offline - self != old - } - - /// Is this a major change compared to the `old` one?. - #[cfg(not(wasm_browser))] pub fn is_major_change(&self, old: &State) -> bool { if self.have_v6 != old.have_v6 || self.have_v4 != old.have_v4 diff --git a/netwatch/src/interfaces/wasm_browser.rs b/netwatch/src/interfaces/wasm_browser.rs index 38c3440..d949359 100644 --- a/netwatch/src/interfaces/wasm_browser.rs +++ b/netwatch/src/interfaces/wasm_browser.rs @@ -115,4 +115,11 @@ impl State { pac: None, } } + + /// Is this a major change compared to the `old` one?. + pub fn is_major_change(&self, old: &State) -> bool { + // All changes are major. + // In the browser, there only are changes from online to offline + self != old + } } diff --git a/netwatch/src/lib.rs b/netwatch/src/lib.rs index d26af9e..5cd2c9f 100644 --- a/netwatch/src/lib.rs +++ b/netwatch/src/lib.rs @@ -10,4 +10,4 @@ mod udp; pub use self::ip_family::IpFamily; #[cfg(not(wasm_browser))] -pub use self::udp::UdpSocket; +pub use self::udp::{UdpSender, UdpSocket}; diff --git a/netwatch/src/udp.rs b/netwatch/src/udp.rs index d0d9649..f100f4d 100644 --- a/netwatch/src/udp.rs +++ b/netwatch/src/udp.rs @@ -3,7 +3,7 @@ use std::{ io, net::SocketAddr, pin::Pin, - sync::{atomic::AtomicBool, RwLock, RwLockReadGuard, TryLockError}, + sync::{atomic::AtomicBool, Arc, RwLock, RwLockReadGuard, TryLockError}, task::{Context, Poll}, }; @@ -321,7 +321,7 @@ impl UdpSocket { panic!("lock poisoned: {:?}", e); } Err(TryLockError::WouldBlock) => { - return Err(io::Error::new(io::ErrorKind::WouldBlock, "")); + return Err(io::Error::new(io::ErrorKind::WouldBlock, "locked")); } }; let (socket, state) = guard.try_get_connected()?; @@ -340,6 +340,50 @@ impl UdpSocket { } } + /// poll send a quinn based `Transmit`. + pub fn poll_send_quinn( + &self, + cx: &mut Context, + transmit: &Transmit<'_>, + ) -> Poll> { + loop { + if let Err(err) = self.maybe_rebind() { + return Poll::Ready(Err(err)); + } + + let guard = n0_future::ready!(self.poll_read_socket(&self.send_waker, cx)); + let (socket, state) = guard.try_get_connected()?; + + match socket.poll_send_ready(cx) { + Poll::Pending => { + self.send_waker.register(cx.waker()); + return Poll::Pending; + } + Poll::Ready(Ok(())) => { + let res = + socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit)); + if let Err(err) = res { + if err.kind() == io::ErrorKind::WouldBlock { + continue; + } + + if let Some(err) = self.handle_write_error(err) { + return Poll::Ready(Err(err)); + } + continue; + } + return Poll::Ready(res); + } + Poll::Ready(Err(err)) => { + if let Some(err) = self.handle_write_error(err) { + return Poll::Ready(Err(err)); + } + continue; + } + } + } + } + /// quinn based `poll_recv` pub fn poll_recv_quinn( &self, @@ -401,6 +445,11 @@ impl UdpSocket { } } + /// Creates a [`UdpSender`] sender. + pub fn create_sender(self: Arc) -> UdpSender { + UdpSender::new(self.clone()) + } + /// Whether transmitted datagrams might get fragmented by the IP layer /// /// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option. @@ -806,6 +855,151 @@ impl Drop for UdpSocket { } } +pin_project_lite::pin_project! { + pub struct UdpSender { + socket: Arc, + #[pin] + fut: Option> + Send + Sync + 'static>>>, + } +} + +impl std::fmt::Debug for UdpSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("UdpSender") + } +} + +impl UdpSender { + fn new(socket: Arc) -> Self { + Self { socket, fut: None } + } + + /// Async sending + pub fn send<'a, 'b>(&self, transmit: &'a quinn_udp::Transmit<'b>) -> SendFutQuinn<'a, 'b> { + SendFutQuinn { + socket: self.socket.clone(), + transmit, + } + } + + /// Poll send + pub fn poll_send( + self: Pin<&mut Self>, + transmit: &quinn_udp::Transmit, + cx: &mut Context, + ) -> Poll> { + let mut this = self.project(); + loop { + if let Err(err) = this.socket.maybe_rebind() { + return Poll::Ready(Err(err)); + } + + let guard = + n0_future::ready!(this.socket.poll_read_socket(&this.socket.send_waker, cx)); + + if this.fut.is_none() { + let socket = this.socket.clone(); + this.fut.set(Some(Box::pin(async move { + n0_future::future::poll_fn(|cx| socket.poll_writable(cx)).await + }))); + } + // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely + // obtain an `&mut Fut` after storing it in `this.fut` when `this` is already behind `Pin`, + // and if we didn't store it then we wouldn't be able to keep it alive between + // `poll_writable` calls. + let result = n0_future::ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx)); + + // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for + // a new `Future` to be created on the next call. + this.fut.set(None); + + // If .writable() fails, propagate the error + result?; + + let (socket, state) = guard.try_get_connected()?; + let result = socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit)); + + match result { + // We thought the socket was writable, but it wasn't, then retry so that either another + // `writable().await` call determines that the socket is indeed not writable and + // registers us for a wakeup, or the send succeeds if this really was just a + // transient failure. + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + // In all other cases, either propagate the error or we're Ok + _ => return Poll::Ready(result), + } + } + } + + /// Best effort sending + pub fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> { + self.socket.maybe_rebind()?; + + match self.socket.socket.try_read() { + Ok(guard) => { + let (socket, state) = guard.try_get_connected()?; + socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit)) + } + Err(TryLockError::Poisoned(e)) => panic!("socket lock poisoned: {e}"), + Err(TryLockError::WouldBlock) => { + Err(io::Error::new(io::ErrorKind::WouldBlock, "locked")) + } + } + } +} + +/// Send future quinn +#[derive(Debug)] +pub struct SendFutQuinn<'a, 'b> { + socket: Arc, + transmit: &'a quinn_udp::Transmit<'b>, +} + +impl Future for SendFutQuinn<'_, '_> { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + loop { + if let Err(err) = self.socket.maybe_rebind() { + return Poll::Ready(Err(err)); + } + + let guard = + n0_future::ready!(self.socket.poll_read_socket(&self.socket.send_waker, cx)); + let (socket, state) = guard.try_get_connected()?; + + match socket.poll_send_ready(cx) { + Poll::Pending => { + self.socket.send_waker.register(cx.waker()); + return Poll::Pending; + } + Poll::Ready(Ok(())) => { + let res = socket.try_io(Interest::WRITABLE, || { + state.send(socket.into(), self.transmit) + }); + + if let Err(err) = res { + if err.kind() == io::ErrorKind::WouldBlock { + continue; + } + if let Some(err) = self.socket.handle_write_error(err) { + return Poll::Ready(Err(err)); + } + continue; + } + return Poll::Ready(res); + } + Poll::Ready(Err(err)) => { + if let Some(err) = self.socket.handle_write_error(err) { + return Poll::Ready(Err(err)); + } + continue; + } + } + } + } +} + #[cfg(test)] mod tests { use testresult::TestResult;