Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't use ? if try_stream! wraps select! #63

Open
stusmall opened this issue Aug 26, 2021 · 4 comments · May be fixed by #74
Open

Can't use ? if try_stream! wraps select! #63

stusmall opened this issue Aug 26, 2021 · 4 comments · May be fixed by #74

Comments

@stusmall
Copy link

I have multiple streams I am joining in a select along with a timeout future that caused me to run into this. I tried to boil it down to a minimal code sample that can trigger the problem. It as follows:

use std::io;
use std::future::Future;
use socket2::{Socket, Domain, Type, Protocol};
use futures::Stream;
use async_stream::try_stream;
use futures::select;
use tokio::time::sleep;
use std::time::Duration;
use futures::FutureExt;
use futures::StreamExt;

fn await_results(
    mut stream1: impl Stream<Item = io::Result<i32>> + Unpin,
) -> impl Stream<Item = io::Result<i32>> {
    try_stream! {
        let mut stream1_future = stream1.next().fuse();
        select!{
            result = stream1_future => {
                stream1_future = stream1.next().fuse();
                match result {
                    Some(x) => {
                        let y = x?;
                        yield y;
                    }
                    None => {

                    }
                }
            }
        }
    }
}


#[tokio::main]
async fn main() -> io::Result<()> {
    Ok(())
}

This fails with the error:

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
  --> src/main.rs:22:33
   |
15 | /     try_stream! {
16 | |         let mut stream1_future = stream1.next().fuse();
17 | |         select!{
18 | |             result = stream1_future => {
...  |
22 | |                         let y = x?;
   | |                                 ^^ cannot use the `?` operator in an async block that returns `()`
...  |
30 | |         }
31 | |     }
   | |_____- this function should return `Result` or `Option` to accept `?`
   |
   = help: the trait `Try` is not implemented for `()`
   = note: required by `from_error`

And I confirmed I'm on 0.3.2 so I have the fix for #27

@stusmall
Copy link
Author

I'm able to easily work around this by using stream! rather than try_stream. So far it looks like I can still get my job done just without the niceness of ?

@taiki-e
Copy link
Member

taiki-e commented Aug 26, 2021

This seems due to that visit_token_stream_impl only visits yield but not ?.

fn visit_token_stream_impl(
visitor: &mut Scrub<'_>,
tokens: TokenStream2,
modified: &mut bool,
out: &mut TokenStream2,
) {
use quote::ToTokens;
use quote::TokenStreamExt;
let mut tokens = tokens.into_iter().peekable();
while let Some(tt) = tokens.next() {
match tt {
TokenTree::Ident(i) if i == "yield" => {
let stream = std::iter::once(TokenTree::Ident(i)).chain(tokens).collect();
match syn::parse2(stream) {
Ok(Partial(yield_expr, rest)) => {
let mut expr = syn::Expr::Yield(yield_expr);
visitor.visit_expr_mut(&mut expr);
expr.to_tokens(out);
*modified = true;
tokens = rest.into_iter().peekable();
}
Err(e) => {
out.append_all(&mut e.to_compile_error().into_iter());
*modified = true;
return;
}
}
}
TokenTree::Ident(i) if i == "stream" || i == "try_stream" => {
out.append(TokenTree::Ident(i));
match tokens.peek() {
Some(TokenTree::Punct(p)) if p.as_char() == '!' => {
out.extend(tokens.next()); // !
if let Some(TokenTree::Group(_)) = tokens.peek() {
out.extend(tokens.next()); // { .. } or [ .. ] or ( .. )
}
}
_ => {}
}
}
TokenTree::Group(group) => {
let mut content = group.stream();
*modified |= visitor.visit_token_stream(&mut content);
let mut new = Group::new(group.delimiter(), content);
new.set_span(group.span());
out.append(new);
}
other => out.append(other),
}

@SabrinaJewson SabrinaJewson linked a pull request May 25, 2022 that will close this issue
@ashtuchkin
Copy link

Just bumped into this. I'm using latest v0.3.5.

@lewisbelcher
Copy link

lewisbelcher commented Oct 23, 2024

I've also just run into this issue on version 0.3.6.

This compiles fine:

use async_stream::try_stream;
use futures::Stream;

async fn add(x: u8) -> Result<u8, String> {
  Ok(x + 1)
}

fn foo() -> impl Stream<Item = Result<u8, String>> {
  try_stream! {
    loop {
      tokio::select! {
        a = add(1) => { a },
        b = add(2) => { b },
      }?; // ? used here outside the `select!` macro
      yield 1;
    }
  }
}

whereas moving the ? operator to inside the select! handler block:

fn foo() -> impl Stream<Item = Result<u8, String>> {
  try_stream! {
    loop {
      tokio::select! {
        a = add(1) => { a? },
        b = add(2) => { b? },
      };
      yield 1;
    }
  }
}

results in an error similar to the one in the original description.

While it's possible to work around it by following the first pattern, it can become quite messy in a non-trivial situation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants