Skip to content

Commit aea61c5

Browse files
Add StreamExt::try_scan
This is the `try_` version of `StreamExt::scan` from master, where the accumulator argument is by value as opposed to by reference, see #2171. Additionally, it's placed on `StreamExt` as opposed to `TryStreamExt`, see #2342.
1 parent 63f0682 commit aea61c5

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

futures-util/src/stream/stream/mod.rs

+51
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use core::pin::Pin;
1414
use futures_core::stream::TryStream;
1515
#[cfg(feature = "alloc")]
1616
use futures_core::stream::{BoxStream, LocalBoxStream};
17+
use futures_core::TryFuture;
1718
use futures_core::{
1819
future::Future,
1920
stream::{FusedStream, Stream},
@@ -181,6 +182,10 @@ mod scan;
181182
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182183
pub use self::scan::Scan;
183184

185+
mod try_scan;
186+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
187+
pub use self::try_scan::TryScan;
188+
184189
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
185190
#[cfg(feature = "alloc")]
186191
mod buffer_unordered;
@@ -1055,6 +1060,52 @@ pub trait StreamExt: Stream {
10551060
assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
10561061
}
10571062

1063+
/// Combinator similar to [`fold`](StreamExt::fold) that holds internal state
1064+
/// and produces a new stream.
1065+
///
1066+
/// Accepts initial state and closure which will be applied to each element
1067+
/// of the stream until provided closure returns `None`. Once `None` is
1068+
/// returned, stream will be terminated.
1069+
///
1070+
/// This method is similar to [`scan`](StreamExt::scan), but will
1071+
/// exit early if an error is encountered in either the stream or the
1072+
/// provided closure.
1073+
///
1074+
/// # Examples
1075+
///
1076+
/// ```
1077+
/// # futures::executor::block_on(async {
1078+
/// use futures::future;
1079+
/// use futures::stream::{self, StreamExt, TryStreamExt};
1080+
///
1081+
/// let stream = stream::iter(1..=10);
1082+
///
1083+
/// let stream = stream.try_scan(0, |mut state, x| {
1084+
/// state += x;
1085+
/// future::ready(if state < 10 { Ok::<_, ()>(Some((state, x))) } else { Ok(None) })
1086+
/// });
1087+
///
1088+
/// assert_eq!(Ok(vec![1, 2, 3]), stream.try_collect::<Vec<_>>().await);
1089+
///
1090+
/// let stream = stream::iter(1..=10);
1091+
///
1092+
/// let stream = stream.try_scan(0, |mut state, x| {
1093+
/// state += x;
1094+
/// future::ready(if state < 10 { Ok(Some((state, x))) } else { Err(()) })
1095+
/// });
1096+
///
1097+
/// assert_eq!(Err(()), stream.try_collect::<Vec<_>>().await);
1098+
/// # });
1099+
/// ```
1100+
fn try_scan<S, B, Fut, F>(self, initial_state: S, f: F) -> TryScan<Self, S, Fut, F>
1101+
where
1102+
F: FnMut(S, Self::Item) -> Fut,
1103+
Fut: TryFuture<Ok = Option<(S, B)>>,
1104+
Self: Sized,
1105+
{
1106+
assert_stream::<Result<B, Fut::Error>, _>(TryScan::new(self, initial_state, f))
1107+
}
1108+
10581109
/// Runs this stream to completion, executing the provided asynchronous
10591110
/// closure for each element on the stream.
10601111
///
+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use crate::unfold_state::UnfoldState;
2+
use core::fmt;
3+
use core::pin::Pin;
4+
use futures_core::stream::{FusedStream, Stream};
5+
use futures_core::task::{Context, Poll};
6+
use futures_core::{ready, TryFuture};
7+
#[cfg(feature = "sink")]
8+
use futures_sink::Sink;
9+
use pin_project_lite::pin_project;
10+
11+
pin_project! {
12+
/// TryStream for the [`try_scan`](super::StreamExt::try_scan) method.
13+
#[must_use = "streams do nothing unless polled"]
14+
pub struct TryScan<St, S, Fut, F> {
15+
#[pin]
16+
stream: St,
17+
f: F,
18+
#[pin]
19+
state: UnfoldState<S, Fut>,
20+
}
21+
}
22+
23+
impl<St, S, Fut, F> fmt::Debug for TryScan<St, S, Fut, F>
24+
where
25+
St: Stream + fmt::Debug,
26+
St::Item: fmt::Debug,
27+
S: fmt::Debug,
28+
Fut: fmt::Debug,
29+
{
30+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31+
f.debug_struct("TryScan")
32+
.field("stream", &self.stream)
33+
.field("state", &self.state)
34+
.field("done_taking", &self.is_done_taking())
35+
.finish()
36+
}
37+
}
38+
39+
impl<St: Stream, S, Fut, F> TryScan<St, S, Fut, F> {
40+
/// Checks if internal state is `None`.
41+
fn is_done_taking(&self) -> bool {
42+
matches!(self.state, UnfoldState::Empty)
43+
}
44+
}
45+
46+
impl<B, St, S, Fut, F> TryScan<St, S, Fut, F>
47+
where
48+
St: Stream,
49+
F: FnMut(S, St::Item) -> Fut,
50+
Fut: TryFuture<Ok = Option<(S, B)>>,
51+
{
52+
pub(super) fn new(stream: St, initial_state: S, f: F) -> Self {
53+
Self { stream, f, state: UnfoldState::Value { value: initial_state } }
54+
}
55+
}
56+
57+
impl<B, St, S, Fut, F> Stream for TryScan<St, S, Fut, F>
58+
where
59+
St: Stream,
60+
F: FnMut(S, St::Item) -> Fut,
61+
Fut: TryFuture<Ok = Option<(S, B)>>,
62+
{
63+
type Item = Result<B, Fut::Error>;
64+
65+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66+
if self.is_done_taking() {
67+
return Poll::Ready(None);
68+
}
69+
70+
let mut this = self.project();
71+
72+
Poll::Ready(loop {
73+
if let Some(fut) = this.state.as_mut().project_future() {
74+
match ready!(fut.try_poll(cx)) {
75+
Ok(None) => {
76+
this.state.set(UnfoldState::Empty);
77+
break None;
78+
}
79+
Ok(Some((state, item))) => {
80+
this.state.set(UnfoldState::Value { value: state });
81+
break Some(Ok(item));
82+
}
83+
Err(e) => {
84+
this.state.set(UnfoldState::Empty);
85+
break Some(Err(e));
86+
}
87+
}
88+
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
89+
let state = this.state.as_mut().take_value().unwrap();
90+
this.state.set(UnfoldState::Future { future: (this.f)(state, item) })
91+
} else {
92+
break None;
93+
}
94+
})
95+
}
96+
97+
fn size_hint(&self) -> (usize, Option<usize>) {
98+
if self.is_done_taking() {
99+
(0, Some(0))
100+
} else {
101+
self.stream.size_hint() // can't know a lower bound, due to the predicate
102+
}
103+
}
104+
}
105+
106+
impl<B, St, S, Fut, F> FusedStream for TryScan<St, S, Fut, F>
107+
where
108+
St: FusedStream,
109+
F: FnMut(S, St::Item) -> Fut,
110+
Fut: TryFuture<Ok = Option<(S, B)>>,
111+
{
112+
fn is_terminated(&self) -> bool {
113+
self.is_done_taking()
114+
|| !matches!(self.state, UnfoldState::Future { .. }) && self.stream.is_terminated()
115+
}
116+
}
117+
118+
// Forwarding impl of Sink from the underlying stream
119+
#[cfg(feature = "sink")]
120+
impl<St, S, Fut, F, Item> Sink<Item> for TryScan<St, S, Fut, F>
121+
where
122+
St: Stream + Sink<Item>,
123+
{
124+
type Error = St::Error;
125+
126+
delegate_sink!(stream, Item);
127+
}

0 commit comments

Comments
 (0)