Skip to content

Commit dd35b1b

Browse files
Nemo157cramertj
authored andcommitted
Test non-obvious fused behaviour
FuturesUnordered can reset from being terminated when it receives new tasks to process. SelectNextSome must handle this to ensure the example pattern of pushing tasks in a looped `select!` works.
1 parent 03d798f commit dd35b1b

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#![feature(async_await, await_macro, futures_api, pin)]
2+
3+
use futures::future;
4+
use futures::task::Poll;
5+
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
6+
use futures_test::task::noop_local_waker_ref;
7+
8+
#[test]
9+
fn is_terminated() {
10+
let lw = noop_local_waker_ref();
11+
let mut tasks = FuturesUnordered::new();
12+
13+
assert_eq!(tasks.is_terminated(), false);
14+
assert_eq!(tasks.poll_next_unpin(lw), Poll::Ready(None));
15+
assert_eq!(tasks.is_terminated(), true);
16+
17+
tasks.push(future::ready(1));
18+
19+
assert_eq!(tasks.is_terminated(), false);
20+
assert_eq!(tasks.poll_next_unpin(lw), Poll::Ready(Some(1)));
21+
assert_eq!(tasks.is_terminated(), false);
22+
assert_eq!(tasks.poll_next_unpin(lw), Poll::Ready(None));
23+
assert_eq!(tasks.is_terminated(), true);
24+
}

futures-util/tests/select_next_some.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,37 @@
11
#![feature(async_await, await_macro, futures_api, pin)]
22

33
use futures::{future, select};
4-
use futures::stream::FuturesUnordered;
5-
use futures::stream::StreamExt;
4+
use futures::future::{FusedFuture, FutureExt};
5+
use futures::stream::{FuturesUnordered, StreamExt};
6+
use futures::task::Poll;
67
use futures_test::future::FutureTestExt;
8+
use futures_test::task::WakeCounter;
79

810
#[test]
9-
fn futures_unordered() {
11+
fn is_terminated() {
12+
let counter = WakeCounter::new();
13+
14+
let mut tasks = FuturesUnordered::new();
15+
16+
let mut select_next_some = tasks.select_next_some();
17+
assert_eq!(select_next_some.is_terminated(), false);
18+
assert_eq!(select_next_some.poll_unpin(counter.local_waker()), Poll::Pending);
19+
assert_eq!(counter.count(), 1);
20+
assert_eq!(select_next_some.is_terminated(), true);
21+
drop(select_next_some);
22+
23+
tasks.push(future::ready(1));
24+
25+
let mut select_next_some = tasks.select_next_some();
26+
assert_eq!(select_next_some.is_terminated(), false);
27+
assert_eq!(select_next_some.poll_unpin(counter.local_waker()), Poll::Ready(1));
28+
assert_eq!(select_next_some.is_terminated(), false);
29+
assert_eq!(select_next_some.poll_unpin(counter.local_waker()), Poll::Pending);
30+
assert_eq!(select_next_some.is_terminated(), true);
31+
}
32+
33+
#[test]
34+
fn select() {
1035
// Checks that even though `async_tasks` will yield a `None` and return
1136
// `is_terminated() == true` during the first poll, it manages to toggle
1237
// back to having items after a future is pushed into it during the second

futures/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ pub mod future {
174174
//! immediate defined value.
175175
176176
pub use futures_core::future::{
177-
Future, TryFuture,
177+
Future, TryFuture, FusedFuture,
178178
FutureObj, LocalFutureObj, UnsafeFutureObj,
179179
};
180180

@@ -295,8 +295,8 @@ pub mod stream {
295295
//! constructs a stream from a collection of futures.
296296
297297
pub use futures_core::stream::{
298-
Stream, TryStream,
299-
StreamObj, LocalStreamObj, UnsafeStreamObj
298+
Stream, TryStream, FusedStream,
299+
StreamObj, LocalStreamObj, UnsafeStreamObj,
300300
};
301301

302302
pub use futures_util::stream::{

0 commit comments

Comments
 (0)