Skip to content

Commit 5ea04ca

Browse files
authored
Change SelectAll iterators to return stream instead of StreamFuture (#2431)
1 parent 5818cca commit 5ea04ca

File tree

4 files changed

+189
-38
lines changed

4 files changed

+189
-38
lines changed

futures-util/src/stream/futures_unordered/iter.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,33 @@ use core::marker::PhantomData;
44
use core::pin::Pin;
55
use core::sync::atomic::Ordering::Relaxed;
66

7-
#[derive(Debug)]
87
/// Mutable iterator over all futures in the unordered set.
8+
#[derive(Debug)]
99
pub struct IterPinMut<'a, Fut> {
1010
pub(super) task: *const Task<Fut>,
1111
pub(super) len: usize,
1212
pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
1313
}
1414

15-
#[derive(Debug)]
1615
/// Mutable iterator over all futures in the unordered set.
16+
#[derive(Debug)]
1717
pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
1818

19-
#[derive(Debug)]
2019
/// Immutable iterator over all futures in the unordered set.
20+
#[derive(Debug)]
2121
pub struct IterPinRef<'a, Fut> {
2222
pub(super) task: *const Task<Fut>,
2323
pub(super) len: usize,
2424
pub(super) pending_next_all: *mut Task<Fut>,
2525
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
2626
}
2727

28-
#[derive(Debug)]
2928
/// Immutable iterator over all the futures in the unordered set.
29+
#[derive(Debug)]
3030
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
3131

32-
#[derive(Debug)]
3332
/// Owned iterator over all futures in the unordered set.
33+
#[derive(Debug)]
3434
pub struct IntoIter<Fut: Unpin> {
3535
pub(super) len: usize,
3636
pub(super) inner: FuturesUnordered<Fut>,
@@ -39,7 +39,7 @@ pub struct IntoIter<Fut: Unpin> {
3939
impl<Fut: Unpin> Iterator for IntoIter<Fut> {
4040
type Item = Fut;
4141

42-
fn next(&mut self) -> Option<Fut> {
42+
fn next(&mut self) -> Option<Self::Item> {
4343
// `head_all` can be accessed directly and we don't need to spin on
4444
// `Task::next_all` since we have exclusive access to the set.
4545
let task = self.inner.head_all.get_mut();
@@ -73,7 +73,7 @@ impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
7373
impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
7474
type Item = Pin<&'a mut Fut>;
7575

76-
fn next(&mut self) -> Option<Pin<&'a mut Fut>> {
76+
fn next(&mut self) -> Option<Self::Item> {
7777
if self.task.is_null() {
7878
return None;
7979
}
@@ -102,7 +102,7 @@ impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
102102
impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
103103
type Item = &'a mut Fut;
104104

105-
fn next(&mut self) -> Option<&'a mut Fut> {
105+
fn next(&mut self) -> Option<Self::Item> {
106106
self.0.next().map(Pin::get_mut)
107107
}
108108

@@ -116,7 +116,7 @@ impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
116116
impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
117117
type Item = Pin<&'a Fut>;
118118

119-
fn next(&mut self) -> Option<Pin<&'a Fut>> {
119+
fn next(&mut self) -> Option<Self::Item> {
120120
if self.task.is_null() {
121121
return None;
122122
}
@@ -145,7 +145,7 @@ impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
145145
impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
146146
type Item = &'a Fut;
147147

148-
fn next(&mut self) -> Option<&'a Fut> {
148+
fn next(&mut self) -> Option<Self::Item> {
149149
self.0.next().map(Pin::get_ref)
150150
}
151151

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ cfg_target_has_atomic! {
105105
pub use self::futures_unordered::FuturesUnordered;
106106

107107
#[cfg(feature = "alloc")]
108-
mod select_all;
108+
pub mod select_all;
109109
#[cfg(feature = "alloc")]
110110
pub use self::select_all::{select_all, SelectAll};
111111

futures-util/src/stream/select_all.rs

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use futures_core::task::{Context, Poll};
1111
use pin_project_lite::pin_project;
1212

1313
use super::assert_stream;
14-
use crate::stream::futures_unordered::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
15-
use crate::stream::{FuturesUnordered, StreamExt, StreamFuture};
14+
use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture};
1615

1716
pin_project! {
1817
/// An unbounded set of streams
@@ -71,27 +70,17 @@ impl<St: Stream + Unpin> SelectAll<St> {
7170
self.inner.push(stream.into_future());
7271
}
7372

74-
/// Returns an iterator that allows inspecting each future in the set.
75-
pub fn iter(&self) -> Iter<'_, StreamFuture<St>> {
76-
self.inner.iter()
73+
/// Returns an iterator that allows inspecting each stream in the set.
74+
pub fn iter(&self) -> Iter<'_, St> {
75+
Iter(self.inner.iter())
7776
}
7877

79-
/// Returns an iterator that allows inspecting each future in the set.
80-
pub fn iter_pin_ref(self: Pin<&'_ Self>) -> IterPinRef<'_, StreamFuture<St>> {
81-
self.project_ref().inner.iter_pin_ref()
78+
/// Returns an iterator that allows modifying each stream in the set.
79+
pub fn iter_mut(&mut self) -> IterMut<'_, St> {
80+
IterMut(self.inner.iter_mut())
8281
}
8382

84-
/// Returns an iterator that allows modifying each future in the set.
85-
pub fn iter_mut(&mut self) -> IterMut<'_, StreamFuture<St>> {
86-
self.inner.iter_mut()
87-
}
88-
89-
/// Returns an iterator that allows modifying each future in the set.
90-
pub fn iter_pin_mut(self: Pin<&mut Self>) -> IterPinMut<'_, StreamFuture<St>> {
91-
self.project().inner.iter_pin_mut()
92-
}
93-
94-
/// Clears the set, removing all futures.
83+
/// Clears the set, removing all streams.
9584
pub fn clear(&mut self) {
9685
self.inner.clear()
9786
}
@@ -139,7 +128,7 @@ impl<St: Stream + Unpin> FusedStream for SelectAll<St> {
139128
/// streams internally, in the order they become available.
140129
///
141130
/// Note that the returned set can also be used to dynamically push more
142-
/// futures into the set as they become available.
131+
/// streams into the set as they become available.
143132
///
144133
/// This function is only available when the `std` or `alloc` feature of this
145134
/// library is activated, and it is activated by default.
@@ -172,28 +161,94 @@ impl<St: Stream + Unpin> Extend<St> for SelectAll<St> {
172161
}
173162

174163
impl<St: Stream + Unpin> IntoIterator for SelectAll<St> {
175-
type Item = StreamFuture<St>;
176-
type IntoIter = IntoIter<StreamFuture<St>>;
164+
type Item = St;
165+
type IntoIter = IntoIter<St>;
177166

178167
fn into_iter(self) -> Self::IntoIter {
179-
self.inner.into_iter()
168+
IntoIter(self.inner.into_iter())
180169
}
181170
}
182171

183172
impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> {
184-
type Item = &'a StreamFuture<St>;
185-
type IntoIter = Iter<'a, StreamFuture<St>>;
173+
type Item = &'a St;
174+
type IntoIter = Iter<'a, St>;
186175

187176
fn into_iter(self) -> Self::IntoIter {
188177
self.iter()
189178
}
190179
}
191180

192181
impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> {
193-
type Item = &'a mut StreamFuture<St>;
194-
type IntoIter = IterMut<'a, StreamFuture<St>>;
182+
type Item = &'a mut St;
183+
type IntoIter = IterMut<'a, St>;
195184

196185
fn into_iter(self) -> Self::IntoIter {
197186
self.iter_mut()
198187
}
199188
}
189+
190+
/// Immutable iterator over all streams in the unordered set.
191+
#[derive(Debug)]
192+
pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture<St>>);
193+
194+
/// Mutable iterator over all streams in the unordered set.
195+
#[derive(Debug)]
196+
pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture<St>>);
197+
198+
/// Owned iterator over all streams in the unordered set.
199+
#[derive(Debug)]
200+
pub struct IntoIter<St: Unpin>(futures_unordered::IntoIter<StreamFuture<St>>);
201+
202+
impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> {
203+
type Item = &'a St;
204+
205+
fn next(&mut self) -> Option<Self::Item> {
206+
let st = self.0.next()?;
207+
let next = st.get_ref();
208+
// This should always be true because FuturesUnordered removes completed futures.
209+
debug_assert!(next.is_some());
210+
next
211+
}
212+
213+
fn size_hint(&self) -> (usize, Option<usize>) {
214+
self.0.size_hint()
215+
}
216+
}
217+
218+
impl<St: Stream + Unpin> ExactSizeIterator for Iter<'_, St> {}
219+
220+
impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> {
221+
type Item = &'a mut St;
222+
223+
fn next(&mut self) -> Option<Self::Item> {
224+
let st = self.0.next()?;
225+
let next = st.get_mut();
226+
// This should always be true because FuturesUnordered removes completed futures.
227+
debug_assert!(next.is_some());
228+
next
229+
}
230+
231+
fn size_hint(&self) -> (usize, Option<usize>) {
232+
self.0.size_hint()
233+
}
234+
}
235+
236+
impl<St: Stream + Unpin> ExactSizeIterator for IterMut<'_, St> {}
237+
238+
impl<St: Stream + Unpin> Iterator for IntoIter<St> {
239+
type Item = St;
240+
241+
fn next(&mut self) -> Option<Self::Item> {
242+
let st = self.0.next()?;
243+
let next = st.into_inner();
244+
// This should always be true because FuturesUnordered removes completed futures.
245+
debug_assert!(next.is_some());
246+
next
247+
}
248+
249+
fn size_hint(&self) -> (usize, Option<usize>) {
250+
self.0.size_hint()
251+
}
252+
}
253+
254+
impl<St: Stream + Unpin> ExactSizeIterator for IntoIter<St> {}

futures/tests/stream_select_all.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,99 @@ fn clear() {
9999
tasks.clear();
100100
assert!(!tasks.is_terminated());
101101
}
102+
103+
#[test]
104+
fn iter_mut() {
105+
let mut stream =
106+
vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
107+
.into_iter()
108+
.collect::<SelectAll<_>>();
109+
110+
let mut iter = stream.iter_mut();
111+
assert_eq!(iter.len(), 3);
112+
assert!(iter.next().is_some());
113+
assert_eq!(iter.len(), 2);
114+
assert!(iter.next().is_some());
115+
assert_eq!(iter.len(), 1);
116+
assert!(iter.next().is_some());
117+
assert_eq!(iter.len(), 0);
118+
assert!(iter.next().is_none());
119+
120+
let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
121+
.into_iter()
122+
.collect::<SelectAll<_>>();
123+
124+
assert_eq!(stream.len(), 3);
125+
assert_eq!(block_on(stream.next()), Some(1));
126+
assert_eq!(stream.len(), 2);
127+
let mut iter = stream.iter_mut();
128+
assert_eq!(iter.len(), 2);
129+
assert!(iter.next().is_some());
130+
assert_eq!(iter.len(), 1);
131+
assert!(iter.next().is_some());
132+
assert_eq!(iter.len(), 0);
133+
assert!(iter.next().is_none());
134+
135+
assert_eq!(block_on(stream.next()), Some(2));
136+
assert_eq!(stream.len(), 2);
137+
assert_eq!(block_on(stream.next()), None);
138+
let mut iter = stream.iter_mut();
139+
assert_eq!(iter.len(), 0);
140+
assert!(iter.next().is_none());
141+
}
142+
143+
#[test]
144+
fn iter() {
145+
let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
146+
.into_iter()
147+
.collect::<SelectAll<_>>();
148+
149+
let mut iter = stream.iter();
150+
assert_eq!(iter.len(), 3);
151+
assert!(iter.next().is_some());
152+
assert_eq!(iter.len(), 2);
153+
assert!(iter.next().is_some());
154+
assert_eq!(iter.len(), 1);
155+
assert!(iter.next().is_some());
156+
assert_eq!(iter.len(), 0);
157+
assert!(iter.next().is_none());
158+
159+
let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
160+
.into_iter()
161+
.collect::<SelectAll<_>>();
162+
163+
assert_eq!(stream.len(), 3);
164+
assert_eq!(block_on(stream.next()), Some(1));
165+
assert_eq!(stream.len(), 2);
166+
let mut iter = stream.iter();
167+
assert_eq!(iter.len(), 2);
168+
assert!(iter.next().is_some());
169+
assert_eq!(iter.len(), 1);
170+
assert!(iter.next().is_some());
171+
assert_eq!(iter.len(), 0);
172+
assert!(iter.next().is_none());
173+
174+
assert_eq!(block_on(stream.next()), Some(2));
175+
assert_eq!(stream.len(), 2);
176+
assert_eq!(block_on(stream.next()), None);
177+
let mut iter = stream.iter();
178+
assert_eq!(iter.len(), 0);
179+
assert!(iter.next().is_none());
180+
}
181+
182+
#[test]
183+
fn into_iter() {
184+
let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
185+
.into_iter()
186+
.collect::<SelectAll<_>>();
187+
188+
let mut iter = stream.into_iter();
189+
assert_eq!(iter.len(), 3);
190+
assert!(iter.next().is_some());
191+
assert_eq!(iter.len(), 2);
192+
assert!(iter.next().is_some());
193+
assert_eq!(iter.len(), 1);
194+
assert!(iter.next().is_some());
195+
assert_eq!(iter.len(), 0);
196+
assert!(iter.next().is_none());
197+
}

0 commit comments

Comments
 (0)