Skip to content

feat(netwatch): adjust for quinn api changes #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions netwatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bytes = "1.7"
n0-future = "0.1.3"
n0-watcher = "0.1"
nested_enum_utils = "0.2.0"
pin-project-lite = "0.2.16"
snafu = "0.8.5"
time = "0.3.20"
tokio = { version = "1", features = [
Expand Down
9 changes: 0 additions & 9 deletions netwatch/src/interfaces.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the whole confusion here.
It might be helpful to move this file to interfaces/native.rs so it lives on the same level as wasm_browser.rs.
The hope would be this makes it a lot clearer that they implement the same stuff.

Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,6 @@ impl State {
}

/// Is this a major change compared to the `old` one?.
#[cfg(wasm_browser)]
pub fn is_major_change(&self, old: &State) -> bool {
// All changes are major.
// In the browser, there only are changes from online to offline
self != old
}

/// Is this a major change compared to the `old` one?.
#[cfg(not(wasm_browser))]
pub fn is_major_change(&self, old: &State) -> bool {
if self.have_v6 != old.have_v6
|| self.have_v4 != old.have_v4
Expand Down
7 changes: 7 additions & 0 deletions netwatch/src/interfaces/wasm_browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,11 @@ impl State {
pac: None,
}
}

/// Is this a major change compared to the `old` one?.
pub fn is_major_change(&self, old: &State) -> bool {
// All changes are major.
// In the browser, there only are changes from online to offline
self != old
}
}
2 changes: 1 addition & 1 deletion netwatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ mod udp;

pub use self::ip_family::IpFamily;
#[cfg(not(wasm_browser))]
pub use self::udp::UdpSocket;
pub use self::udp::{UdpSender, UdpSocket};
198 changes: 196 additions & 2 deletions netwatch/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
io,
net::SocketAddr,
pin::Pin,
sync::{atomic::AtomicBool, RwLock, RwLockReadGuard, TryLockError},
sync::{atomic::AtomicBool, Arc, RwLock, RwLockReadGuard, TryLockError},
task::{Context, Poll},
};

Expand Down Expand Up @@ -321,7 +321,7 @@ impl UdpSocket {
panic!("lock poisoned: {:?}", e);
}
Err(TryLockError::WouldBlock) => {
return Err(io::Error::new(io::ErrorKind::WouldBlock, ""));
return Err(io::Error::new(io::ErrorKind::WouldBlock, "locked"));
}
};
let (socket, state) = guard.try_get_connected()?;
Expand All @@ -340,6 +340,50 @@ impl UdpSocket {
}
}

/// poll send a quinn based `Transmit`.
pub fn poll_send_quinn(
&self,
cx: &mut Context,
transmit: &Transmit<'_>,
) -> Poll<io::Result<()>> {
loop {
if let Err(err) = self.maybe_rebind() {
return Poll::Ready(Err(err));
}

let guard = n0_future::ready!(self.poll_read_socket(&self.send_waker, cx));
let (socket, state) = guard.try_get_connected()?;

match socket.poll_send_ready(cx) {
Poll::Pending => {
self.send_waker.register(cx.waker());
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
let res =
socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit));
if let Err(err) = res {
if err.kind() == io::ErrorKind::WouldBlock {
continue;
}

if let Some(err) = self.handle_write_error(err) {
return Poll::Ready(Err(err));
}
continue;
}
return Poll::Ready(res);
}
Poll::Ready(Err(err)) => {
if let Some(err) = self.handle_write_error(err) {
return Poll::Ready(Err(err));
}
continue;
}
}
}
}

/// quinn based `poll_recv`
pub fn poll_recv_quinn(
&self,
Expand Down Expand Up @@ -401,6 +445,11 @@ impl UdpSocket {
}
}

/// Creates a [`UdpSender`] sender.
pub fn create_sender(self: Arc<Self>) -> UdpSender {
UdpSender::new(self.clone())
}

/// Whether transmitted datagrams might get fragmented by the IP layer
///
/// Returns `false` on targets which employ e.g. the `IPV6_DONTFRAG` socket option.
Expand Down Expand Up @@ -806,6 +855,151 @@ impl Drop for UdpSocket {
}
}

pin_project_lite::pin_project! {
pub struct UdpSender {
socket: Arc<UdpSocket>,
#[pin]
fut: Option<Pin<Box<dyn Future<Output = io::Result<()>> + Send + Sync + 'static>>>,
}
}

impl std::fmt::Debug for UdpSender {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("UdpSender")
}
}

impl UdpSender {
fn new(socket: Arc<UdpSocket>) -> Self {
Self { socket, fut: None }
}

/// Async sending
pub fn send<'a, 'b>(&self, transmit: &'a quinn_udp::Transmit<'b>) -> SendFutQuinn<'a, 'b> {
SendFutQuinn {
socket: self.socket.clone(),
transmit,
}
}

/// Poll send
pub fn poll_send(
self: Pin<&mut Self>,
transmit: &quinn_udp::Transmit,
cx: &mut Context,
) -> Poll<io::Result<()>> {
let mut this = self.project();
loop {
if let Err(err) = this.socket.maybe_rebind() {
return Poll::Ready(Err(err));
}

let guard =
n0_future::ready!(this.socket.poll_read_socket(&this.socket.send_waker, cx));

if this.fut.is_none() {
let socket = this.socket.clone();
this.fut.set(Some(Box::pin(async move {
n0_future::future::poll_fn(|cx| socket.poll_writable(cx)).await
})));
}
// We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
// obtain an `&mut Fut` after storing it in `this.fut` when `this` is already behind `Pin`,
// and if we didn't store it then we wouldn't be able to keep it alive between
// `poll_writable` calls.
let result = n0_future::ready!(this.fut.as_mut().as_pin_mut().unwrap().poll(cx));

// Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
// a new `Future` to be created on the next call.
this.fut.set(None);

// If .writable() fails, propagate the error
result?;

let (socket, state) = guard.try_get_connected()?;
let result = socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit));

match result {
// We thought the socket was writable, but it wasn't, then retry so that either another
// `writable().await` call determines that the socket is indeed not writable and
// registers us for a wakeup, or the send succeeds if this really was just a
// transient failure.
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
// In all other cases, either propagate the error or we're Ok
_ => return Poll::Ready(result),
}
}
}

/// Best effort sending
pub fn try_send(&self, transmit: &quinn_udp::Transmit) -> io::Result<()> {
self.socket.maybe_rebind()?;

match self.socket.socket.try_read() {
Ok(guard) => {
let (socket, state) = guard.try_get_connected()?;
socket.try_io(Interest::WRITABLE, || state.send(socket.into(), transmit))
}
Err(TryLockError::Poisoned(e)) => panic!("socket lock poisoned: {e}"),
Err(TryLockError::WouldBlock) => {
Err(io::Error::new(io::ErrorKind::WouldBlock, "locked"))
}
}
}
}

/// Send future quinn
#[derive(Debug)]
pub struct SendFutQuinn<'a, 'b> {
socket: Arc<UdpSocket>,
transmit: &'a quinn_udp::Transmit<'b>,
}

impl Future for SendFutQuinn<'_, '_> {
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
loop {
if let Err(err) = self.socket.maybe_rebind() {
return Poll::Ready(Err(err));
}

let guard =
n0_future::ready!(self.socket.poll_read_socket(&self.socket.send_waker, cx));
let (socket, state) = guard.try_get_connected()?;

match socket.poll_send_ready(cx) {
Poll::Pending => {
self.socket.send_waker.register(cx.waker());
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
let res = socket.try_io(Interest::WRITABLE, || {
state.send(socket.into(), self.transmit)
});

if let Err(err) = res {
if err.kind() == io::ErrorKind::WouldBlock {
continue;
}
if let Some(err) = self.socket.handle_write_error(err) {
return Poll::Ready(Err(err));
}
continue;
}
return Poll::Ready(res);
}
Poll::Ready(Err(err)) => {
if let Some(err) = self.socket.handle_write_error(err) {
return Poll::Ready(Err(err));
}
continue;
}
}
}
}
}

#[cfg(test)]
mod tests {
use testresult::TestResult;
Expand Down
Loading