Skip to content

Commit 40c32d0

Browse files
taiki-ecramertj
authored andcommitted
Add FusedStream implementations
1 parent cde791c commit 40c32d0

14 files changed

+124
-16
lines changed

futures-util/src/sink/buffer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_core::stream::Stream;
1+
use futures_core::stream::{Stream, FusedStream};
22
use futures_core::task::{Context, Poll};
33
use futures_sink::Sink;
44
use pin_utils::{unsafe_pinned, unsafe_unpinned};
@@ -78,6 +78,12 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
7878
}
7979
}
8080

81+
impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
82+
fn is_terminated(&self) -> bool {
83+
self.sink.is_terminated()
84+
}
85+
}
86+
8187
impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
8288
type Error = Si::Error;
8389

futures-util/src/sink/err_into.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::sink::{SinkExt, SinkMapErr};
22
use core::pin::Pin;
3-
use futures_core::stream::Stream;
3+
use futures_core::stream::{Stream, FusedStream};
44
use futures_core::task::{Context, Poll};
55
use futures_sink::{Sink};
66
use pin_utils::unsafe_pinned;
@@ -57,6 +57,7 @@ impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
5757
delegate_sink!(sink, Item);
5858
}
5959

60+
// Forwarding impl of Stream from the underlying sink
6061
impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
6162
where S: Sink<Item> + Stream,
6263
S::Error: Into<E>
@@ -70,3 +71,12 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
7071
self.sink().poll_next(cx)
7172
}
7273
}
74+
75+
impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
76+
where S: Sink<Item> + FusedStream,
77+
S::Error: Into<E>
78+
{
79+
fn is_terminated(&self) -> bool {
80+
self.sink.is_terminated()
81+
}
82+
}

futures-util/src/sink/map_err.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use core::pin::Pin;
2-
use futures_core::stream::Stream;
2+
use futures_core::stream::{Stream, FusedStream};
33
use futures_core::task::{Context, Poll};
44
use futures_sink::{Sink};
55
use pin_utils::{unsafe_pinned, unsafe_unpinned};
@@ -85,6 +85,7 @@ impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
8585
}
8686
}
8787

88+
// Forwarding impl of Stream from the underlying sink
8889
impl<S: Stream, F> Stream for SinkMapErr<S, F> {
8990
type Item = S::Item;
9091

@@ -95,3 +96,9 @@ impl<S: Stream, F> Stream for SinkMapErr<S, F> {
9596
self.sink().poll_next(cx)
9697
}
9798
}
99+
100+
impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
101+
fn is_terminated(&self) -> bool {
102+
self.sink.is_terminated()
103+
}
104+
}

futures-util/src/sink/with_flat_map.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::fmt;
22
use core::marker::PhantomData;
33
use core::pin::Pin;
4-
use futures_core::stream::Stream;
4+
use futures_core::stream::{Stream, FusedStream};
55
use futures_core::task::{Context, Poll};
66
use futures_sink::Sink;
77
use pin_utils::{unsafe_pinned, unsafe_unpinned};
@@ -113,6 +113,7 @@ where
113113
}
114114
}
115115

116+
// Forwarding impl of Stream from the underlying sink
116117
impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F>
117118
where
118119
S: Stream + Sink<Item>,
@@ -128,6 +129,17 @@ where
128129
}
129130
}
130131

132+
impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>
133+
where
134+
S: FusedStream + Sink<Item>,
135+
F: FnMut(U) -> St,
136+
St: Stream<Item = Result<Item, S::Error>>,
137+
{
138+
fn is_terminated(&self) -> bool {
139+
self.sink.is_terminated()
140+
}
141+
}
142+
131143
impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F>
132144
where
133145
Si: Sink<Item>,

futures-util/src/stream/catch_unwind.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_core::stream::Stream;
1+
use futures_core::stream::{Stream, FusedStream};
22
use futures_core::task::{Context, Poll};
33
use pin_utils::{unsafe_pinned, unsafe_unpinned};
44
use std::any::Any;
@@ -22,8 +22,7 @@ impl<St: Stream + UnwindSafe> CatchUnwind<St> {
2222
}
2323
}
2424

25-
impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St>
26-
{
25+
impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
2726
type Item = Result<St::Item, Box<dyn Any + Send>>;
2827

2928
fn poll_next(
@@ -47,3 +46,9 @@ impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St>
4746
}
4847
}
4948
}
49+
50+
impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> {
51+
fn is_terminated(&self) -> bool {
52+
self.caught_unwind || self.stream.is_terminated()
53+
}
54+
}

futures-util/src/stream/chunks.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::stream::Fuse;
2-
use futures_core::stream::Stream;
2+
use futures_core::stream::{Stream, FusedStream};
33
use futures_core::task::{Context, Poll};
44
#[cfg(feature = "sink")]
55
use futures_sink::Sink;
@@ -107,6 +107,12 @@ impl<St: Stream> Stream for Chunks<St> {
107107
}
108108
}
109109

110+
impl<St: FusedStream> FusedStream for Chunks<St> {
111+
fn is_terminated(&self) -> bool {
112+
self.stream.is_terminated() && self.items.is_empty()
113+
}
114+
}
115+
110116
// Forwarding impl of Sink from the underlying stream
111117
#[cfg(feature = "sink")]
112118
impl<S, Item> Sink<Item> for Chunks<S>

futures-util/src/stream/once.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::pin::Pin;
22
use futures_core::future::Future;
3-
use futures_core::stream::Stream;
3+
use futures_core::stream::{Stream, FusedStream};
44
use futures_core::task::{Context, Poll};
55
use pin_utils::unsafe_pinned;
66

@@ -51,3 +51,9 @@ impl<Fut: Future> Stream for Once<Fut> {
5151
Poll::Ready(Some(val))
5252
}
5353
}
54+
55+
impl<Fut: Future> FusedStream for Once<Fut> {
56+
fn is_terminated(&self) -> bool {
57+
self.future.is_none()
58+
}
59+
}

futures-util/src/stream/repeat.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use core::pin::Pin;
2-
use futures_core::stream::Stream;
2+
use futures_core::stream::{Stream, FusedStream};
33
use futures_core::task::{Context, Poll};
44

55
/// Stream for the [`repeat`] function.
@@ -40,3 +40,11 @@ impl<T> Stream for Repeat<T>
4040
Poll::Ready(Some(self.item.clone()))
4141
}
4242
}
43+
44+
impl<T> FusedStream for Repeat<T>
45+
where T: Clone,
46+
{
47+
fn is_terminated(&self) -> bool {
48+
false
49+
}
50+
}

futures-util/src/stream/skip_while.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
9595
Fut: Future<Output = bool>,
9696
{
9797
fn is_terminated(&self) -> bool {
98-
self.stream.is_terminated()
98+
self.pending_item.is_none() && self.stream.is_terminated()
9999
}
100100
}
101101

futures-util/src/stream/take.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use core::pin::Pin;
2-
use futures_core::stream::Stream;
2+
use futures_core::stream::{Stream, FusedStream};
33
use futures_core::task::{Context, Poll};
44
#[cfg(feature = "sink")]
55
use futures_sink::Sink;
@@ -81,6 +81,14 @@ impl<St> Stream for Take<St>
8181
}
8282
}
8383

84+
impl<St> FusedStream for Take<St>
85+
where St: FusedStream,
86+
{
87+
fn is_terminated(&self) -> bool {
88+
self.remaining == 0 || self.stream.is_terminated()
89+
}
90+
}
91+
8492
// Forwarding impl of Sink from the underlying stream
8593
#[cfg(feature = "sink")]
8694
impl<S, Item> Sink<Item> for Take<S>

futures-util/src/stream/take_while.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::Future;
4-
use futures_core::stream::Stream;
4+
use futures_core::stream::{Stream, FusedStream};
55
use futures_core::task::{Context, Poll};
66
#[cfg(feature = "sink")]
77
use futures_sink::Sink;
@@ -129,6 +129,16 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
129129
}
130130
}
131131

132+
impl<St, Fut, F> FusedStream for TakeWhile<St, Fut, F>
133+
where St: FusedStream,
134+
F: FnMut(&St::Item) -> Fut,
135+
Fut: Future<Output = bool>,
136+
{
137+
fn is_terminated(&self) -> bool {
138+
self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
139+
}
140+
}
141+
132142
// Forwarding impl of Sink from the underlying stream
133143
#[cfg(feature = "sink")]
134144
impl<S, Fut, F, Item> Sink<Item> for TakeWhile<S, Fut, F>

futures-util/src/try_stream/and_then.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::TryFuture;
4-
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::stream::{Stream, TryStream, FusedStream};
55
use futures_core::task::{Context, Poll};
66
#[cfg(feature = "sink")]
77
use futures_sink::Sink;
@@ -104,6 +104,16 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
104104
}
105105
}
106106

107+
impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
108+
where St: TryStream + FusedStream,
109+
F: FnMut(St::Ok) -> Fut,
110+
Fut: TryFuture<Error = St::Error>,
111+
{
112+
fn is_terminated(&self) -> bool {
113+
self.future.is_none() && self.stream.is_terminated()
114+
}
115+
}
116+
107117
// Forwarding impl of Sink from the underlying stream
108118
#[cfg(feature = "sink")]
109119
impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>

futures-util/src/try_stream/or_else.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::TryFuture;
4-
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::stream::{Stream, TryStream, FusedStream};
55
use futures_core::task::{Context, Poll};
66
#[cfg(feature = "sink")]
77
use futures_sink::Sink;
@@ -105,6 +105,16 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
105105
}
106106
}
107107

108+
impl<St, Fut, F> FusedStream for OrElse<St, Fut, F>
109+
where St: TryStream + FusedStream,
110+
F: FnMut(St::Error) -> Fut,
111+
Fut: TryFuture<Ok = St::Ok>,
112+
{
113+
fn is_terminated(&self) -> bool {
114+
self.future.is_none() && self.stream.is_terminated()
115+
}
116+
}
117+
108118
// Forwarding impl of Sink from the underlying stream
109119
#[cfg(feature = "sink")]
110120
impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>

futures-util/src/try_stream/try_skip_while.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::fmt;
22
use core::pin::Pin;
33
use futures_core::future::TryFuture;
4-
use futures_core::stream::{Stream, TryStream};
4+
use futures_core::stream::{Stream, TryStream, FusedStream};
55
use futures_core::task::{Context, Poll};
66
#[cfg(feature = "sink")]
77
use futures_sink::Sink;
@@ -133,6 +133,16 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
133133
}
134134
}
135135

136+
impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F>
137+
where St: TryStream + FusedStream,
138+
F: FnMut(&St::Ok) -> Fut,
139+
Fut: TryFuture<Ok = bool, Error = St::Error>,
140+
{
141+
fn is_terminated(&self) -> bool {
142+
self.pending_item.is_none() && self.stream.is_terminated()
143+
}
144+
}
145+
136146
// Forwarding impl of Sink from the underlying stream
137147
#[cfg(feature = "sink")]
138148
impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F>

0 commit comments

Comments
 (0)