Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeouts for blocking IO #4

Open
kevinmehall opened this issue Oct 29, 2023 · 4 comments
Open

Timeouts for blocking IO #4

kevinmehall opened this issue Oct 29, 2023 · 4 comments
Labels
feature-request New feature or request

Comments

@kevinmehall
Copy link
Owner

For Rust async, it's normal for futures not to have their own timeouts. To implement a timeout you race the future with a timeout future such that the main future will be cancelled if the timeout completes first. But this requires a reactor to provide timer functionality as a generic future, and I want to make nusb not require pulling in tokio or async-std for the common case where you don't actually need async in the rest of the app.

futures_lite::future::block_on is implemented on top of parking, which has an API with a timeout. So we could see if futures_lite would add a timeout (seems like many uses of blocking on async IO would want this?), find another implementation that has this to recommend, or copy that code here.

@XVilka
Copy link

XVilka commented Jun 24, 2024

It is definitely useful, given that libusb-rs provided this feature for transactions.

tuna-f1sh pushed a commit to tuna-f1sh/nusb that referenced this issue Sep 26, 2024
api: add public API helpers for `UdevDevice`
@wuwbobo2021
Copy link

wuwbobo2021 commented Nov 10, 2024

Inspired by https://docs.rs/probe-rs/latest/src/probe_rs/probe/usb_util.rs.html (which probably discards possible data on timeout), I wrote and tested a prototype of synchronous reader for nusb, but I'm not sure if it is the correct implementation. And I can't unify synchronous read and write operations into a wrapper of Queue externally, because the generic type restrictions transfer::internal::TransferRequest and platform::TransferData: PlatformSubmit<R> are held privately inside nusb.

use std::{future::Future, io::{Error, ErrorKind}, time::Duration};
use futures_lite::{FutureExt, future::block_on};

use nusb::transfer::{Queue, RequestBuffer, TransferError};
type ReadQueue = Queue<RequestBuffer>;
type WriteQueue = Queue<Vec<u8>>;

fn wait_timeout<T>(fut: impl Future<Output = T>, dur: Duration) -> Option<T> {
    let fut_comp = async { Some(fut.await) };
    let fut_cancel = async {
        futures_timer::Delay::new(dur).await;
        None
    };
    block_on(fut_comp.or(fut_cancel))
}

/// Synchronous wrapper of a `nusb` IN transfer queue.
pub struct SyncReader {
    queue: ReadQueue,
    buf: Option<Vec<u8>>,
}
impl SyncReader {
    /// Wraps the asynchronous queue.
    pub fn new(queue: ReadQueue) -> Self {
        Self {
            queue,
            buf: Some(Vec::new()),
        }
    }
    /// It is similar to `read()` in the standard `Read` trait, requiring timeout parameter.
    pub fn read(&mut self, buf: &mut [u8], timeout: Duration) -> std::io::Result<usize> {
        if buf.is_empty() {
            return Ok(0);
        }
        let buf_async = self.buf.take().unwrap();
        // Safety: `RequestBuffer::reuse()` may reserve larger capacity to reach buf.len()
        let req = nusb::transfer::RequestBuffer::reuse(buf_async, buf.len());

        self.queue.submit(req);
        let fut = self.queue.next_complete();
        let comp = {
            let mut maybe_comp = wait_timeout(fut, timeout);
            if maybe_comp.is_none() {
                self.queue.cancel_all(); // the only one
                if self.queue.pending() == 0 {
                    self.buf.replace(Vec::new());
                    return Err(Error::other("Unable to get the transfer result"));
                }
                let comp = block_on(self.queue.next_complete());
                maybe_comp.replace(comp);
            }
            maybe_comp.unwrap()
        };
        let len_reveived = comp.data.len();

        let result = match comp.status {
            Ok(()) => {
                buf[..len_reveived].copy_from_slice(&comp.data);
                Ok(len_reveived)
            }
            Err(TransferError::Cancelled) => {
                if len_reveived > 0 {
                    buf[..len_reveived].copy_from_slice(&comp.data);
                    Ok(len_reveived)
                } else {
                    Err(Error::from(ErrorKind::TimedOut))
                }
            }
            Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
            Err(TransferError::Stall) => {
                let _ = self.queue.clear_halt();
                Err(Error::other(TransferError::Stall))
            }
            Err(e) => Err(Error::other(e)),
        };
        self.buf.replace(comp.data);
        result
    }
}

If copy_from_slice() should be avoided, I'll probably do so:

// replaces `let buf_async = self.buf.take().unwrap();`
let buf_async = unsafe { Vec::from_raw_parts(buf.as_mut_ptr(), buf.len(), buf.len()) };
// ...
// replaces `self.buf.replace(comp.data);`
let _ = comp.data.leak();

I think it's highly unsafe, but it might work. Do you think it's acceptable?

@wuwbobo2021
Copy link

Sorry, I've reviewed my previous code and found a bad mistake: copy_from_slice() will panic because it was used wrongly, so I hid it as spam for a while. The current code is tested by myself.

Will this feature be added any time soon?

@wuwbobo2021
Copy link

I've changed my code again to use the futures_timer crate, it doesn't introduce any extra dependency by default.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants