From d59b8a32df89ae7d132dfff919efc920fbddebc2 Mon Sep 17 00:00:00 2001 From: Sam Rijs Date: Sun, 22 Oct 2017 14:32:49 +1100 Subject: [PATCH] Implement FuturesUnordered::iter_mut --- src/stream/futures_unordered.rs | 67 +++++++++++++++++++++------------ src/stream/mod.rs | 28 +++++++++++++- tests/futures_unordered.rs | 42 +++++++++++++++++++++ 3 files changed, 111 insertions(+), 26 deletions(-) diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index 632bd83114..2940fd3495 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -1,3 +1,5 @@ +//! An unbounded set of futures. + use std::cell::UnsafeCell; use std::fmt::{self, Debug}; use std::iter::FromIterator; @@ -9,7 +11,7 @@ use std::sync::atomic::{AtomicPtr, AtomicBool}; use std::sync::{Arc, Weak}; use std::usize; -use {task, Stream, Future, Poll, Async, IntoFuture}; +use {task, Stream, Future, Poll, Async}; use executor::{Notify, UnsafeNotify, NotifyHandle}; use task_impl::{self, AtomicTask}; @@ -51,29 +53,6 @@ pub struct FuturesUnordered { unsafe impl Send for FuturesUnordered {} unsafe impl Sync for FuturesUnordered {} -/// Converts a list of futures into a `Stream` of results from the futures. -/// -/// This function will take an list of futures (e.g. a vector, an iterator, -/// etc), and return a stream. The stream will yield items as they become -/// available on the futures internally, in the order that they become -/// available. This function is similar to `buffer_unordered` in that it may -/// return items in a different order than in the list specified. -/// -/// Note that the returned set can also be used to dynamically push more -/// futures into the set as they become available. -pub fn futures_unordered(futures: I) -> FuturesUnordered<::Future> - where I: IntoIterator, - I::Item: IntoFuture -{ - let mut set = FuturesUnordered::new(); - - for future in futures { - set.push(future.into_future()); - } - - return set -} - // FuturesUnordered is implemented using two linked lists. One which links all // futures managed by a `FuturesUnordered` and one that tracks futures that have // been scheduled for polling. The first linked list is not thread safe and is @@ -207,6 +186,15 @@ impl FuturesUnordered { self.inner.enqueue(ptr); } + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> IterMut { + IterMut { + node: self.head_all, + len: self.len, + _marker: PhantomData + } + } + fn release_node(&mut self, node: Arc>) { // The future is done, try to reset the queued flag. This will prevent // `notify` from doing any work in the future @@ -440,6 +428,37 @@ impl FromIterator for FuturesUnordered { } } +#[derive(Debug)] +/// Mutable iterator over all futures in the unordered set. +pub struct IterMut<'a, F: 'a> { + node: *const Node, + len: usize, + _marker: PhantomData<&'a mut FuturesUnordered> +} + +impl<'a, F> Iterator for IterMut<'a, F> { + type Item = &'a mut F; + + fn next(&mut self) -> Option<&'a mut F> { + if self.node.is_null() { + return None; + } + unsafe { + let future = (*(*self.node).future.get()).as_mut().unwrap(); + let next = *(*self.node).next_all.get(); + self.node = next; + self.len -= 1; + return Some(future); + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, F> ExactSizeIterator for IterMut<'a, F> {} + impl Inner { /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. fn enqueue(&self, node: *const Node) { diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 53e866a3b6..e3ea2e3fdf 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -103,7 +103,7 @@ if_std! { mod wait; mod channel; mod split; - mod futures_unordered; + pub mod futures_unordered; mod futures_ordered; pub use self::buffered::Buffered; pub use self::buffer_unordered::BufferUnordered; @@ -112,7 +112,7 @@ if_std! { pub use self::collect::Collect; pub use self::wait::Wait; pub use self::split::{SplitStream, SplitSink}; - pub use self::futures_unordered::{futures_unordered, FuturesUnordered}; + pub use self::futures_unordered::FuturesUnordered; pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; #[doc(hidden)] @@ -1102,3 +1102,27 @@ impl<'a, S: ?Sized + Stream> Stream for &'a mut S { (**self).poll() } } + +/// Converts a list of futures into a `Stream` of results from the futures. +/// +/// This function will take an list of futures (e.g. a vector, an iterator, +/// etc), and return a stream. The stream will yield items as they become +/// available on the futures internally, in the order that they become +/// available. This function is similar to `buffer_unordered` in that it may +/// return items in a different order than in the list specified. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +#[cfg(feature = "use_std")] +pub fn futures_unordered(futures: I) -> FuturesUnordered<::Future> + where I: IntoIterator, + I::Item: IntoFuture +{ + let mut set = FuturesUnordered::new(); + + for future in futures { + set.push(future.into_future()); + } + + return set +} diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index e29f3d9609..9b8c08d01b 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -83,3 +83,45 @@ fn finished_future_ok() { assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); } + +#[test] +fn iter_mut_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]); + + for rx in stream.iter_mut() { + rx.close(); + } + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + let mut spawn = futures::executor::spawn(stream); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(None, spawn.wait_stream()); +} + +#[test] +fn iter_mut_len() { + let mut stream = futures_unordered(vec![ + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>() + ]); + + let mut iter_mut = stream.iter_mut(); + assert_eq!(iter_mut.len(), 3); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 2); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 1); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 0); + assert!(iter_mut.next().is_none()); +}