Skip to content

Commit 177ad4c

Browse files
bors[bot]taiki-e
andauthored
Merge #15
15: Add async_try_stream to support ? operator in async stream r=taiki-e a=taiki-e `?` operator can be used with the `#[async_try_stream]` attribute and `async_try_stream_block!` macro. ```rust #![feature(generators)] use futures::stream::Stream; use futures_async_stream::try_async_stream; #[async_try_stream(ok = i32, error = Box<dyn std::error::Error + Send + Sync + 'static>)] async fn foo(stream: impl Stream<Item = String>) { #[for_await] for x in stream { yield x.parse()?; } } ``` Closes #2 Co-authored-by: Taiki Endo <[email protected]>
2 parents 000a45b + d69699d commit 177ad4c

File tree

13 files changed

+484
-58
lines changed

13 files changed

+484
-58
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ description = """
1313
Async stream for Rust and the futures crate.
1414
"""
1515

16+
[package.metadata.docs.rs]
17+
all-features = true
18+
1619
[workspace]
1720
members = ["futures-async-stream-macro"]
1821

README.md

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Add this to your `Cargo.toml`:
2727
```toml
2828
[dependencies]
2929
futures-async-stream = "0.1.0-alpha.5"
30-
futures-preview = "0.3.0-alpha.17"
30+
futures-preview = "0.3.0-alpha.18"
3131
```
3232

3333
The current futures-async-stream requires Rust nightly 2019-08-21 or later.
@@ -150,10 +150,27 @@ impl Foo for Bar {
150150
}
151151
```
152152

153+
## `#[async_try_stream]` and `async_try_stream_block!`
154+
155+
`?` operator can be used with the `#[async_try_stream]` and `async_try_stream_block!`. The `Item` of the returned stream is `Result` with `Ok` being the value yielded and `Err` the error type returned by `?` operator or `return Err(...)`.
156+
157+
```rust
158+
#![feature(generators)]
159+
use futures::stream::Stream;
160+
use futures_async_stream::async_try_stream;
161+
162+
#[async_try_stream(ok = i32, error = Box<dyn std::error::Error + Send + Sync>)]
163+
async fn foo(stream: impl Stream<Item = String>) {
164+
#[for_await]
165+
for x in stream {
166+
yield x.parse()?;
167+
}
168+
}
169+
```
170+
153171
<!--
154172
## List of features that may be added in the future as an extension of this feature:
155173
156-
* `async_try_stream` (https://github.com/rust-lang-nursery/futures-rs/pull/1548#discussion_r287558350)
157174
* `async_sink` (https://github.com/rust-lang-nursery/futures-rs/pull/1548#issuecomment-486205382)
158175
* Support `.await` in macro (https://github.com/rust-lang-nursery/futures-rs/pull/1548#discussion_r285341883)
159176
* Parallel version of `for_await` (https://github.com/rustasync/runtime/pull/25)
@@ -214,8 +231,7 @@ where
214231
type Item = i32;
215232

216233
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
217-
let this = self.project();
218-
if let Some(x) = ready!(this.stream.poll_next(cx)) {
234+
if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
219235
Poll::Ready(Some(x.parse().unwrap()))
220236
} else {
221237
Poll::Ready(None)

futures-async-stream-macro/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ description = """
1212
Definition of the `#[async_stream]` macro for the `futures-async-stream` crate as well as a few other assorted macros.
1313
"""
1414

15+
[package.metadata.docs.rs]
16+
all-features = true
17+
1518
[lib]
1619
proc-macro = true
1720

futures-async-stream-macro/src/lib.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod utils;
2222

2323
mod elision;
2424
mod stream;
25+
mod try_stream;
2526
mod visitor;
2627

2728
/// Processes streams using a for loop.
@@ -40,12 +41,25 @@ pub fn for_await(args: TokenStream, input: TokenStream) -> TokenStream {
4041
/// Creates streams via generators.
4142
#[proc_macro_attribute]
4243
pub fn async_stream(args: TokenStream, input: TokenStream) -> TokenStream {
43-
stream::async_stream(args.into(), input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
44+
stream::attribute(args.into(), input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
4445
}
4546

4647
/// Creates streams via generators.
4748
#[proc_macro]
4849
pub fn async_stream_block(input: TokenStream) -> TokenStream {
4950
let input = TokenStream::from(TokenTree::Group(Group::new(Delimiter::Brace, input)));
50-
stream::async_stream_block(input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
51+
stream::block_macro(input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
52+
}
53+
54+
/// Creates streams via generators.
55+
#[proc_macro_attribute]
56+
pub fn async_try_stream(args: TokenStream, input: TokenStream) -> TokenStream {
57+
try_stream::attribute(args.into(), input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
58+
}
59+
60+
/// Creates streams via generators.
61+
#[proc_macro]
62+
pub fn async_try_stream_block(input: TokenStream) -> TokenStream {
63+
let input = TokenStream::from(TokenTree::Group(Group::new(Delimiter::Brace, input)));
64+
try_stream::block_macro(input.into()).unwrap_or_else(|e| e.to_compile_error()).into()
5165
}

futures-async-stream-macro/src/stream.rs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
// =================================================================================================
1818
// async_stream
1919

20-
pub(super) fn async_stream(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
20+
pub(super) fn attribute(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
2121
parse_async_stream_fn(args, input)
2222
}
2323

@@ -38,9 +38,8 @@ impl Parse for Item {
3838
}
3939
}
4040

41-
// TODO: rename to `ReturnType`
4241
#[derive(Clone, Copy)]
43-
enum ReturnTypeKind {
42+
pub(super) enum ReturnTypeKind {
4443
// impl Stream<Item = ..> $(+ $lifetime)?
4544
Default,
4645
// Pin<Box<dyn Stream<Item = ..> (+ Send)? $(+ $lifetime)?>>
@@ -122,16 +121,16 @@ impl Parse for Args {
122121
}
123122
}
124123

125-
struct FnSig {
126-
attrs: Vec<Attribute>,
127-
vis: Visibility,
128-
sig: Signature,
129-
block: Block,
130-
semi: Option<Token![;]>,
124+
pub(super) struct FnSig {
125+
pub(super) attrs: Vec<Attribute>,
126+
pub(super) vis: Visibility,
127+
pub(super) sig: Signature,
128+
pub(super) block: Block,
129+
pub(super) semi: Option<Token![;]>,
131130
}
132131

133132
impl FnSig {
134-
fn parse(input: TokenStream, boxed: ReturnTypeKind) -> Result<Self> {
133+
pub(super) fn parse(input: TokenStream, boxed: ReturnTypeKind) -> Result<Self> {
135134
match boxed {
136135
ReturnTypeKind::Default => syn::parse2(input).map(ItemFn::into),
137136
ReturnTypeKind::Boxed { .. } => {
@@ -167,7 +166,7 @@ impl From<TraitItemMethod> for FnSig {
167166
}
168167
}
169168

170-
fn validate_async_stream_fn(item: &FnSig) -> Result<()> {
169+
pub(super) fn validate_async_stream_fn(item: &FnSig) -> Result<()> {
171170
if item.sig.asyncness.is_none() {
172171
return Err(error!(item.sig.fn_token, "async stream must be declared as async"));
173172
}
@@ -193,15 +192,7 @@ fn validate_async_stream_fn(item: &FnSig) -> Result<()> {
193192
Ok(())
194193
}
195194

196-
fn parse_async_stream_fn(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
197-
let args: Args = syn::parse2(args)?;
198-
let item = FnSig::parse(input, args.boxed)?;
199-
200-
validate_async_stream_fn(&item)?;
201-
Ok(expand_async_stream_fn(item, &args))
202-
}
203-
204-
fn expand_async_body(inputs: Punctuated<FnArg, Comma>) -> (Vec<FnArg>, Vec<Local>) {
195+
pub(super) fn expand_async_body(inputs: Punctuated<FnArg, Comma>) -> (Vec<FnArg>, Vec<Local>) {
205196
let mut arguments: Vec<FnArg> = Vec::new();
206197
let mut statements: Vec<Local> = Vec::new();
207198

@@ -274,7 +265,13 @@ fn expand_async_body(inputs: Punctuated<FnArg, Comma>) -> (Vec<FnArg>, Vec<Local
274265
(arguments, statements)
275266
}
276267

277-
fn make_gen_body(statements: &[Local], block: &Block, gen_function: &TokenStream) -> TokenStream {
268+
pub(super) fn make_gen_body(
269+
statements: &[Local],
270+
block: &Block,
271+
gen_function: &TokenStream,
272+
ret_value: &TokenStream,
273+
ret_ty: &TokenStream,
274+
) -> TokenStream {
278275
let block_inner = quote! {
279276
#(#statements)*
280277
#block
@@ -292,7 +289,7 @@ fn make_gen_body(statements: &[Local], block: &Block, gen_function: &TokenStream
292289
// have any `yield` statements.
293290
#[allow(unreachable_code)]
294291
{
295-
return;
292+
return #ret_value;
296293
loop { yield ::futures_async_stream::reexport::task::Poll::Pending }
297294
}
298295
};
@@ -302,10 +299,18 @@ fn make_gen_body(statements: &[Local], block: &Block, gen_function: &TokenStream
302299
});
303300

304301
quote! {
305-
#gen_function (static move || -> () #gen_body)
302+
#gen_function(static move || -> #ret_ty #gen_body)
306303
}
307304
}
308305

306+
fn parse_async_stream_fn(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
307+
let args: Args = syn::parse2(args)?;
308+
let item = FnSig::parse(input, args.boxed)?;
309+
310+
validate_async_stream_fn(&item)?;
311+
Ok(expand_async_stream_fn(item, &args))
312+
}
313+
309314
fn expand_async_stream_fn(item: FnSig, args: &Args) -> TokenStream {
310315
let FnSig { attrs, vis, sig, mut block, semi } = item;
311316
let Signature { unsafety, abi, fn_token, ident, mut generics, inputs, .. } = sig;
@@ -324,7 +329,7 @@ fn expand_async_stream_fn(item: FnSig, args: &Args) -> TokenStream {
324329
let output_span = first_last(item);
325330
let gen_function = quote!(::futures_async_stream::stream::from_generator);
326331
let gen_function = respan(gen_function, output_span);
327-
let mut body_inner = make_gen_body(&statements, &block, &gen_function);
332+
let mut body_inner = make_gen_body(&statements, &block, &gen_function, &quote!(), &quote!(()));
328333

329334
if let ReturnTypeKind::Boxed { .. } = args.boxed {
330335
let body = quote! { ::futures_async_stream::reexport::boxed::Box::pin(#body_inner) };
@@ -344,15 +349,15 @@ fn expand_async_stream_fn(item: FnSig, args: &Args) -> TokenStream {
344349
// Raw `impl` breaks syntax highlighting in some editors.
345350
let impl_token = token::Impl::default();
346351
quote! {
347-
#impl_token ::futures_async_stream::stream::Stream<Item = #item> + #(#lifetimes +)*
352+
#impl_token ::futures_async_stream::reexport::Stream<Item = #item> + #(#lifetimes +)*
348353
}
349354
}
350355
ReturnTypeKind::Boxed { send } => {
351356
let send = if send { Some(quote!(+ Send)) } else { None };
352357
quote! {
353358
::futures_async_stream::reexport::pin::Pin<
354359
::futures_async_stream::reexport::boxed::Box<
355-
dyn ::futures_async_stream::stream::Stream<Item = #item> #send + #(#lifetimes +)*
360+
dyn ::futures_async_stream::reexport::Stream<Item = #item> #send + #(#lifetimes +)*
356361
>
357362
>
358363
}
@@ -375,13 +380,13 @@ fn expand_async_stream_fn(item: FnSig, args: &Args) -> TokenStream {
375380
// =================================================================================================
376381
// async_stream_block
377382

378-
pub(super) fn async_stream_block(input: TokenStream) -> Result<TokenStream> {
383+
pub(super) fn block_macro(input: TokenStream) -> Result<TokenStream> {
379384
syn::parse2(input).map(expand_async_stream_block)
380385
}
381386

382387
fn expand_async_stream_block(mut expr: Expr) -> TokenStream {
383388
Visitor::new(Stream).visit_expr_mut(&mut expr);
384389

385390
let gen_function = quote!(::futures_async_stream::stream::from_generator);
386-
make_gen_body(&[], &block(vec![Stmt::Expr(expr)]), &gen_function)
391+
make_gen_body(&[], &block(vec![Stmt::Expr(expr)]), &gen_function, &quote!(), &quote!(()))
387392
}

0 commit comments

Comments
 (0)