Skip to content

Commit c35e035

Browse files
authored
Merge pull request #32 from Nemo157/stream
#[async_stream] implementation
2 parents f6a9c6a + 82fc83d commit c35e035

20 files changed

+588
-58
lines changed

README.md

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ fn foo() -> io::Result<i32> {
6464
}
6565
```
6666

67-
And finally you can also have "async `for` loops" which operate over the
68-
[`Stream`] trait:
67+
You can also have "async `for` loops" which operate over the [`Stream`] trait:
6968

7069
```rust
7170
#[async]
@@ -78,6 +77,32 @@ An async `for` loop will propagate errors out of the function so `message` has
7877
the `Item` type of the `stream` passed in. Note that async `for` loops can only
7978
be used inside of an `#[async]` function.
8079

80+
And finally, you can create a `Stream` instead of a `Future` via
81+
`#[async_stream(item = _)]`:
82+
83+
```rust
84+
#[async]
85+
fn fetch(client: hyper::Client, url: &'static str) -> io::Result<String> {
86+
// ...
87+
}
88+
89+
/// Fetch all provided urls one at a time
90+
#[async_stream(item = String)]
91+
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
92+
for url in urls {
93+
let s = await!(fetch(client, url))?;
94+
stream_yield!(s);
95+
}
96+
Ok(())
97+
}
98+
```
99+
100+
`#[async_stream]` must have an item type specified via `item = some::Path` and
101+
the values output from the stream must be wrapped into a `Result` and yielded
102+
via the `stream_yield!` macro. This macro also supports the same features as
103+
`#[async]`, an additional `boxed` argument to return a `Box<Stream>`, async
104+
`for` loops, etc.
105+
81106
[`Future`]: https://docs.rs/futures/0.1.13/futures/future/trait.Future.html
82107
[`Stream`]: https://docs.rs/futures/0.1.13/futures/stream/trait.Stream.html
83108

futures-async-macro/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,7 @@ proc-macro2 = { version = "0.1", features = ["unstable"] }
1414
git = 'https://github.com/dtolnay/syn'
1515
features = ["full", "fold", "parsing", "printing"]
1616
default-features = false
17+
18+
[dependencies.synom]
19+
git = 'https://github.com/dtolnay/syn'
20+
default-features = false

futures-async-macro/src/lib.rs

Lines changed: 178 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,23 @@ extern crate proc_macro;
1818
#[macro_use]
1919
extern crate quote;
2020
extern crate syn;
21+
#[macro_use]
22+
extern crate synom;
2123

2224
use proc_macro2::Span;
2325
use proc_macro::{TokenStream, TokenTree, Delimiter, TokenNode};
2426
use quote::{Tokens, ToTokens};
2527
use syn::*;
28+
use syn::delimited::Delimited;
2629
use syn::fold::Folder;
2730

28-
#[proc_macro_attribute]
29-
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
30-
// Handle arguments to the #[async] attribute, if any
31-
let attribute = attribute.to_string();
32-
let boxed = if attribute == "( boxed )" {
33-
true
34-
} else if attribute == "" {
35-
false
36-
} else {
37-
panic!("the #[async] attribute currently only takes `boxed` as an arg");
38-
};
39-
31+
fn async_inner<F>(
32+
boxed: bool,
33+
function: TokenStream,
34+
gen_function: Tokens,
35+
return_ty: F)
36+
-> TokenStream
37+
where F: FnOnce(&Ty) -> proc_macro2::TokenStream {
4038
// Parse our item, expecting a function. This function may be an actual
4139
// top-level function or it could be a method (typically dictated by the
4240
// arguments). We then extract everything we'd like to use.
@@ -164,30 +162,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
164162
// Basically just take all those expression and expand them.
165163
let block = ExpandAsyncFor.fold_block(*block);
166164

167-
// TODO: can we lift the restriction that `futures` must be at the root of
168-
// the crate?
169-
170-
let output_span = first_last(&output);
171-
let return_ty = if boxed {
172-
quote! {
173-
Box<::futures::Future<
174-
Item = <! as ::futures::__rt::IsResult>::Ok,
175-
Error = <! as ::futures::__rt::IsResult>::Err,
176-
>>
177-
}
178-
} else {
179-
// Dunno why this is buggy, hits weird typecheck errors in tests
180-
//
181-
// quote! {
182-
// impl ::futures::Future<
183-
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
184-
// Error = <#output as ::futures::__rt::MyTry>::MyError,
185-
// >
186-
// }
187-
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
188-
};
189-
let return_ty = respan(return_ty.into(), &output_span);
190-
let return_ty = replace_bang(return_ty, &output);
165+
let return_ty = return_ty(&output);
191166

192167
let block_inner = quote! {
193168
#( let #patterns = #temp_bindings; )*
@@ -207,7 +182,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
207182
#[allow(unreachable_code)]
208183
{
209184
return __e;
210-
loop { yield }
185+
loop { yield ::futures::Async::NotReady }
211186
}
212187
};
213188
let mut gen_body = Tokens::new();
@@ -218,7 +193,7 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
218193
// Give the invocation of the `gen` function the same span as the output
219194
// as currently errors related to it being a result are targeted here. Not
220195
// sure if more errors will highlight this function call...
221-
let gen_function = quote! { ::futures::__rt::gen };
196+
let output_span = first_last(&output);
222197
let gen_function = respan(gen_function.into(), &output_span);
223198
let body_inner = quote! {
224199
#gen_function (move || #gen_body)
@@ -247,6 +222,99 @@ pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
247222
output.into()
248223
}
249224

225+
#[proc_macro_attribute]
226+
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
227+
// Handle arguments to the #[async] attribute, if any
228+
let attribute = attribute.to_string();
229+
let boxed = if attribute == "( boxed )" {
230+
true
231+
} else if attribute == "" {
232+
false
233+
} else {
234+
panic!("the #[async] attribute currently only takes `boxed` as an arg");
235+
};
236+
237+
async_inner(boxed, function, quote! { ::futures::__rt::gen }, |output| {
238+
// TODO: can we lift the restriction that `futures` must be at the root of
239+
// the crate?
240+
let output_span = first_last(&output);
241+
let return_ty = if boxed {
242+
quote! {
243+
Box<::futures::Future<
244+
Item = <! as ::futures::__rt::IsResult>::Ok,
245+
Error = <! as ::futures::__rt::IsResult>::Err,
246+
>>
247+
}
248+
} else {
249+
// Dunno why this is buggy, hits weird typecheck errors in tests
250+
//
251+
// quote! {
252+
// impl ::futures::Future<
253+
// Item = <#output as ::futures::__rt::MyTry>::MyOk,
254+
// Error = <#output as ::futures::__rt::MyTry>::MyError,
255+
// >
256+
// }
257+
quote! { impl ::futures::__rt::MyFuture<!> + 'static }
258+
};
259+
let return_ty = respan(return_ty.into(), &output_span);
260+
replace_bang(return_ty, &output)
261+
})
262+
}
263+
264+
#[proc_macro_attribute]
265+
pub fn async_stream(attribute: TokenStream, function: TokenStream) -> TokenStream {
266+
// Handle arguments to the #[async_stream] attribute, if any
267+
let args = syn::parse::<AsyncStreamArgs>(attribute)
268+
.expect("failed to parse attribute arguments");
269+
270+
let mut boxed = false;
271+
let mut item_ty = None;
272+
273+
for arg in args.0 {
274+
match arg {
275+
AsyncStreamArg(term, None) => {
276+
if term == "boxed" {
277+
if boxed {
278+
panic!("duplicate 'boxed' argument to #[async_stream]");
279+
}
280+
boxed = true;
281+
} else {
282+
panic!("unexpected #[async_stream] argument '{}'", term);
283+
}
284+
}
285+
AsyncStreamArg(term, Some(ty)) => {
286+
if term == "item" {
287+
if item_ty.is_some() {
288+
panic!("duplicate 'item' argument to #[async_stream]");
289+
}
290+
item_ty = Some(ty);
291+
} else {
292+
panic!("unexpected #[async_stream] argument '{}'", quote!(#term = #ty));
293+
}
294+
}
295+
}
296+
}
297+
298+
let boxed = boxed;
299+
let item_ty = item_ty.expect("#[async_stream] requires item type to be specified");
300+
301+
async_inner(boxed, function, quote! { ::futures::__rt::gen_stream }, |output| {
302+
let output_span = first_last(&output);
303+
let return_ty = if boxed {
304+
quote! {
305+
Box<::futures::Stream<
306+
Item = !,
307+
Error = <! as ::futures::__rt::IsResult>::Err,
308+
>>
309+
}
310+
} else {
311+
quote! { impl ::futures::__rt::MyStream<!, !> + 'static }
312+
};
313+
let return_ty = respan(return_ty.into(), &output_span);
314+
replace_bangs(return_ty, &[&item_ty, &output])
315+
})
316+
}
317+
250318
#[proc_macro]
251319
pub fn async_block(input: TokenStream) -> TokenStream {
252320
let input = TokenStream::from(TokenTree {
@@ -268,7 +336,40 @@ pub fn async_block(input: TokenStream) -> TokenStream {
268336
syn::tokens::Move(span).to_tokens(tokens);
269337
syn::tokens::OrOr([span, span]).to_tokens(tokens);
270338
syn::tokens::Brace(span).surround(tokens, |tokens| {
271-
(quote! { if false { yield } }).to_tokens(tokens);
339+
(quote! {
340+
if false { yield ::futures::Async::NotReady }
341+
}).to_tokens(tokens);
342+
expr.to_tokens(tokens);
343+
});
344+
});
345+
346+
tokens.into()
347+
}
348+
349+
#[proc_macro]
350+
pub fn async_stream_block(input: TokenStream) -> TokenStream {
351+
let input = TokenStream::from(TokenTree {
352+
kind: TokenNode::Group(Delimiter::Brace, input),
353+
span: Default::default(),
354+
});
355+
let expr = syn::parse(input)
356+
.expect("failed to parse tokens as an expression");
357+
let expr = ExpandAsyncFor.fold_expr(expr);
358+
359+
let mut tokens = quote! {
360+
::futures::__rt::gen_stream
361+
};
362+
363+
// Use some manual token construction here instead of `quote!` to ensure
364+
// that we get the `call_site` span instead of the default span.
365+
let span = syn::Span(Span::call_site());
366+
syn::tokens::Paren(span).surround(&mut tokens, |tokens| {
367+
syn::tokens::Move(span).to_tokens(tokens);
368+
syn::tokens::OrOr([span, span]).to_tokens(tokens);
369+
syn::tokens::Brace(span).surround(tokens, |tokens| {
370+
(quote! {
371+
if false { yield ::futures::Async::NotReady }
372+
}).to_tokens(tokens);
272373
expr.to_tokens(tokens);
273374
});
274375
});
@@ -311,7 +412,7 @@ impl Folder for ExpandAsyncFor {
311412
}
312413
}
313414
futures_await::Async::NotReady => {
314-
yield;
415+
yield futures_await::Async::NotReady;
315416
continue
316417
}
317418
}
@@ -362,3 +463,40 @@ fn replace_bang(input: proc_macro2::TokenStream, tokens: &ToTokens)
362463
}
363464
new_tokens.into()
364465
}
466+
467+
fn replace_bangs(input: proc_macro2::TokenStream, replacements: &[&ToTokens])
468+
-> proc_macro2::TokenStream
469+
{
470+
let mut replacements = replacements.iter().cycle();
471+
let mut new_tokens = Tokens::new();
472+
for token in input.into_iter() {
473+
match token.kind {
474+
proc_macro2::TokenNode::Op('!', _) => {
475+
replacements.next().unwrap().to_tokens(&mut new_tokens);
476+
}
477+
_ => token.to_tokens(&mut new_tokens),
478+
}
479+
}
480+
new_tokens.into()
481+
}
482+
483+
struct AsyncStreamArg(syn::Ident, Option<syn::Ty>);
484+
485+
impl synom::Synom for AsyncStreamArg {
486+
named!(parse -> Self, do_parse!(
487+
i: syn!(syn::Ident) >>
488+
p: option!(do_parse!(
489+
syn!(syn::tokens::Eq) >>
490+
p: syn!(syn::Ty) >>
491+
(p))) >>
492+
(AsyncStreamArg(i, p))));
493+
}
494+
495+
struct AsyncStreamArgs(Vec<AsyncStreamArg>);
496+
497+
impl synom::Synom for AsyncStreamArgs {
498+
named!(parse -> Self, map!(
499+
option!(parens!(call!(Delimited::<AsyncStreamArg, syn::tokens::Comma>::parse_separated_nonempty))),
500+
|p| AsyncStreamArgs(p.map(|d| d.0.into_vec()).unwrap_or_default())
501+
));
502+
}

futures-await-macro/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,18 @@ macro_rules! await {
2222
break ::futures::__rt::Err(e)
2323
}
2424
}
25-
yield
25+
yield ::futures::Async::NotReady
2626
}
2727
})
2828
}
29+
30+
// TODO: This macro needs to use an extra temporary variable because of
31+
// rust-lang/rust#44197, once that's fixed this should just use $e directly
32+
// inside the yield expression
33+
#[macro_export]
34+
macro_rules! stream_yield {
35+
($e:expr) => ({
36+
let e = $e;
37+
yield ::futures::Async::Ready(e)
38+
})
39+
}

0 commit comments

Comments
 (0)