Skip to content

Commit 37fb821

Browse files
Merge pull request #1162 from Nemo157/extended-compat
Extend compatibility to Stream and Sink
2 parents 4e6793d + a5b7397 commit 37fb821

File tree

11 files changed

+198
-42
lines changed

11 files changed

+198
-42
lines changed

.rustfmt.toml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
max_width = 80
2+
comment_width = 80

.travis.yml

+11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ matrix:
3535
- cargo build --manifest-path futures-sink/Cargo.toml --no-default-features
3636
- cargo build --manifest-path futures-util/Cargo.toml --no-default-features
3737

38+
- name: cargo build --all-features
39+
rust: nightly
40+
script:
41+
- cargo build --manifest-path futures/Cargo.toml --all-features
42+
- cargo build --manifest-path futures-core/Cargo.toml --all-features
43+
- cargo build --manifest-path futures-channel/Cargo.toml --all-features
44+
- cargo build --manifest-path futures-executor/Cargo.toml --all-features
45+
- cargo build --manifest-path futures-io/Cargo.toml --all-features
46+
- cargo build --manifest-path futures-sink/Cargo.toml --all-features
47+
- cargo build --manifest-path futures-util/Cargo.toml --all-features
48+
3849
- name: cargo build --target=thumbv6m-none-eabi
3950
rust: nightly
4051
install:

futures-util/src/compat/compat.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
/// Converts a futures 0.3 `TryFuture` into a futures 0.1 `Future`
2-
/// and vice versa.
1+
/// Converts a futures 0.3 `TryFuture`, `TryStream` or `Sink` into a futures 0.1
2+
/// `Future` and vice versa.
33
#[derive(Debug)]
44
#[must_use = "futures do nothing unless polled"]
5-
pub struct Compat<Fut, Ex> {
6-
crate future: Fut,
5+
pub struct Compat<T, Ex> {
6+
crate inner: T,
77
crate executor: Option<Ex>,
88
}
99

10-
impl<Fut, Ex> Compat<Fut, Ex> {
11-
/// Returns the inner future.
12-
pub fn into_inner(self) -> Fut {
13-
self.future
10+
impl<T, Ex> Compat<T, Ex> {
11+
/// Returns the inner item.
12+
pub fn into_inner(self) -> T {
13+
self.inner
1414
}
1515

1616
/// Creates a new `Compat`.
17-
crate fn new(future: Fut, executor: Option<Ex>) -> Compat<Fut, Ex> {
18-
Compat { future, executor }
17+
crate fn new(inner: T, executor: Option<Ex>) -> Compat<T, Ex> {
18+
Compat { inner, executor }
1919
}
2020
}

futures-util/src/compat/compat01to03.rs

+34-16
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,48 @@
11
use super::Compat;
2-
use futures::Async as Async01;
3-
use futures::Future as Future01;
4-
use futures::executor::{self as executor01, NotifyHandle as NotifyHandle01,
5-
Notify as Notify01, UnsafeNotify as UnsafeNotify01};
6-
use futures_core::Future as Future03;
7-
use futures_core::task as task03;
2+
use futures::{
3+
executor::{
4+
self as executor01, Notify as Notify01, NotifyHandle as NotifyHandle01,
5+
UnsafeNotify as UnsafeNotify01,
6+
},
7+
Async as Async01, Future as Future01, Stream as Stream01,
8+
};
9+
use futures_core::{task as task03, Future as Future03, Stream as Stream03};
810
use std::mem::PinMut;
911

1012
impl<Fut: Future01> Future03 for Compat<Fut, ()> {
1113
type Output = Result<Fut::Item, Fut::Error>;
1214

1315
fn poll(
1416
self: PinMut<Self>,
15-
cx: &mut task03::Context
17+
cx: &mut task03::Context,
1618
) -> task03::Poll<Self::Output> {
1719
let notify = &WakerToHandle(cx.waker());
1820

1921
executor01::with_notify(notify, 0, move || {
20-
unsafe {
21-
match PinMut::get_mut_unchecked(self).future.poll() {
22-
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
23-
Ok(Async01::NotReady) => task03::Poll::Pending,
24-
Err(e) => task03::Poll::Ready(Err(e)),
25-
}
22+
match unsafe { PinMut::get_mut_unchecked(self) }.inner.poll() {
23+
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
24+
Ok(Async01::NotReady) => task03::Poll::Pending,
25+
Err(e) => task03::Poll::Ready(Err(e)),
26+
}
27+
})
28+
}
29+
}
30+
31+
impl<St: Stream01> Stream03 for Compat<St, ()> {
32+
type Item = Result<St::Item, St::Error>;
33+
34+
fn poll_next(
35+
self: PinMut<Self>,
36+
cx: &mut task03::Context,
37+
) -> task03::Poll<Option<Self::Item>> {
38+
let notify = &WakerToHandle(cx.waker());
39+
40+
executor01::with_notify(notify, 0, move || {
41+
match unsafe { PinMut::get_mut_unchecked(self) }.inner.poll() {
42+
Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))),
43+
Ok(Async01::Ready(None)) => task03::Poll::Ready(None),
44+
Ok(Async01::NotReady) => task03::Poll::Pending,
45+
Err(e) => task03::Poll::Ready(Some(Err(e))),
2646
}
2747
})
2848
}
@@ -37,9 +57,7 @@ impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {
3757
fn from(handle: WakerToHandle<'a>) -> NotifyHandle01 {
3858
let ptr = Box::new(NotifyWaker(handle.0.clone()));
3959

40-
unsafe {
41-
NotifyHandle01::new(Box::into_raw(ptr))
42-
}
60+
unsafe { NotifyHandle01::new(Box::into_raw(ptr)) }
4361
}
4462
}
4563

+85-15
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,87 @@
11
use super::Compat;
2-
use futures::Future as Future01;
3-
use futures::Poll as Poll01;
4-
use futures::task as task01;
5-
use futures::Async as Async01;
6-
use futures_core::TryFuture as TryFuture03;
7-
use futures_core::task as task03;
8-
use std::marker::Unpin;
9-
use std::mem::PinMut;
10-
use std::sync::Arc;
2+
use futures::{
3+
task as task01, Async as Async01, AsyncSink as AsyncSink01,
4+
Future as Future01, Poll as Poll01, Sink as Sink01,
5+
StartSend as StartSend01, Stream as Stream01,
6+
};
7+
use futures_core::{
8+
task as task03, TryFuture as TryFuture03, TryStream as TryStream03,
9+
};
10+
use futures_sink::Sink as Sink03;
11+
use std::{marker::Unpin, mem::PinMut, sync::Arc};
1112

1213
impl<Fut, Ex> Future01 for Compat<Fut, Ex>
13-
where Fut: TryFuture03 + Unpin,
14-
Ex: task03::Executor
14+
where
15+
Fut: TryFuture03 + Unpin,
16+
Ex: task03::Executor,
1517
{
1618
type Item = Fut::Ok;
1719
type Error = Fut::Error;
1820

1921
fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
20-
let waker = current_as_waker();
21-
let mut cx = task03::Context::new(&waker, self.executor.as_mut().unwrap());
22-
match PinMut::new(&mut self.future).try_poll(&mut cx) {
22+
with_context(self, |inner, cx| match inner.try_poll(cx) {
2323
task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)),
2424
task03::Poll::Pending => Ok(Async01::NotReady),
2525
task03::Poll::Ready(Err(e)) => Err(e),
26-
}
26+
})
27+
}
28+
}
29+
30+
impl<St, Ex> Stream01 for Compat<St, Ex>
31+
where
32+
St: TryStream03 + Unpin,
33+
Ex: task03::Executor,
34+
{
35+
type Item = St::Ok;
36+
type Error = St::Error;
37+
38+
fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
39+
with_context(self, |inner, cx| match inner.try_poll_next(cx) {
40+
task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
41+
task03::Poll::Ready(Some(Ok(t))) => Ok(Async01::Ready(Some(t))),
42+
task03::Poll::Pending => Ok(Async01::NotReady),
43+
task03::Poll::Ready(Some(Err(e))) => Err(e),
44+
})
45+
}
46+
}
47+
48+
impl<T, E> Sink01 for Compat<T, E>
49+
where
50+
T: Sink03 + Unpin,
51+
E: task03::Executor,
52+
{
53+
type SinkItem = T::SinkItem;
54+
type SinkError = T::SinkError;
55+
56+
fn start_send(
57+
&mut self,
58+
item: Self::SinkItem,
59+
) -> StartSend01<Self::SinkItem, Self::SinkError> {
60+
with_context(self, |mut inner, cx| {
61+
match inner.reborrow().poll_ready(cx) {
62+
task03::Poll::Ready(Ok(())) => {
63+
inner.start_send(item).map(|()| AsyncSink01::Ready)
64+
}
65+
task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
66+
task03::Poll::Ready(Err(e)) => Err(e),
67+
}
68+
})
69+
}
70+
71+
fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
72+
with_context(self, |inner, cx| match inner.poll_flush(cx) {
73+
task03::Poll::Ready(Ok(())) => Ok(Async01::Ready(())),
74+
task03::Poll::Pending => Ok(Async01::NotReady),
75+
task03::Poll::Ready(Err(e)) => Err(e),
76+
})
77+
}
78+
79+
fn close(&mut self) -> Poll01<(), Self::SinkError> {
80+
with_context(self, |inner, cx| match inner.poll_close(cx) {
81+
task03::Poll::Ready(Ok(())) => Ok(Async01::Ready(())),
82+
task03::Poll::Pending => Ok(Async01::NotReady),
83+
task03::Poll::Ready(Err(e)) => Err(e),
84+
})
2785
}
2886
}
2987

@@ -39,3 +97,15 @@ impl task03::Wake for Current {
3997
arc_self.0.notify();
4098
}
4199
}
100+
101+
fn with_context<T, E, R, F>(compat: &mut Compat<T, E>, f: F) -> R
102+
where
103+
T: Unpin,
104+
E: task03::Executor,
105+
F: FnOnce(PinMut<T>, &mut task03::Context) -> R,
106+
{
107+
let waker = current_as_waker();
108+
let executor = compat.executor.as_mut().unwrap();
109+
let mut cx = task03::Context::new(&waker, executor);
110+
f(PinMut::new(&mut compat.inner), &mut cx)
111+
}

futures-util/src/compat/future01ext.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub trait Future01CompatExt: Future01 {
99
/// futures 0.3 `Future<Output = Result<T, E>>`.
1010
fn compat(self) -> Compat<Self, ()> where Self: Sized {
1111
Compat {
12-
future: self,
12+
inner: self,
1313
executor: None,
1414
}
1515
}

futures-util/src/compat/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ mod compat03to01;
1313

1414
mod future01ext;
1515
pub use self::future01ext::Future01CompatExt;
16+
17+
mod stream01ext;
18+
pub use self::stream01ext::Stream01CompatExt;
+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use super::Compat;
2+
use futures::Stream as Stream01;
3+
4+
impl<St: Stream01> Stream01CompatExt for St {}
5+
6+
/// Extension trait for futures 0.1 [`Stream`][Stream01]
7+
pub trait Stream01CompatExt: Stream01 {
8+
/// Converts a futures 0.1 [`Stream<Item = T, Error = E>`][Stream01] into a
9+
/// futures 0.3 [`Stream<Item = Result<T, E>>`][Stream03].
10+
fn compat(self) -> Compat<Self, ()>
11+
where
12+
Self: Sized,
13+
{
14+
Compat {
15+
inner: self,
16+
executor: None,
17+
}
18+
}
19+
}

futures-util/src/sink/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ use futures_core::future::Future;
99
use futures_core::stream::Stream;
1010
use futures_sink::Sink;
1111

12+
#[cfg(feature = "compat")]
13+
use crate::compat::Compat;
14+
15+
#[cfg(feature = "compat")]
16+
use futures_core::task::Executor;
17+
1218
mod close;
1319
pub use self::close::Close;
1420

@@ -248,4 +254,14 @@ pub trait SinkExt: Sink {
248254
{
249255
Either::Right(self)
250256
}
257+
258+
/// Wraps a [`Sink`] into a sink compatible with libraries using
259+
/// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
260+
#[cfg(feature = "compat")]
261+
fn compat<E>(self, executor: E) -> Compat<Self, E>
262+
where Self: Sized + Unpin,
263+
E: Executor,
264+
{
265+
Compat::new(self, Some(executor))
266+
}
251267
}

futures-util/src/try_stream/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ use core::marker::Unpin;
77
use futures_core::future::TryFuture;
88
use futures_core::stream::TryStream;
99

10+
#[cfg(feature = "compat")]
11+
use crate::compat::Compat;
12+
13+
#[cfg(feature = "compat")]
14+
use futures_core::task::Executor;
15+
1016
mod err_into;
1117
pub use self::err_into::ErrInto;
1218

@@ -363,4 +369,14 @@ pub trait TryStreamExt: TryStream {
363369
{
364370
TryBufferUnordered::new(self, n)
365371
}
372+
373+
/// Wraps a [`TryStream`] into a stream compatible with libraries using
374+
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
375+
#[cfg(feature = "compat")]
376+
fn compat<E>(self, executor: E) -> Compat<Self, E>
377+
where Self: Sized + Unpin,
378+
E: Executor,
379+
{
380+
Compat::new(self, Some(executor))
381+
}
366382
}

futures/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ pub mod compat {
9090
Executor01As03,
9191
Executor01CompatExt,
9292
Future01CompatExt,
93+
Stream01CompatExt,
9394
};
9495
}
9596

0 commit comments

Comments
 (0)