Skip to content

Add Sink01As03 compat shim #1364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 28, 2019
187 changes: 174 additions & 13 deletions futures-util/src/compat/compat01as03.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use futures_01::executor::{
Spawn as Spawn01, spawn as spawn01,
UnsafeNotify as UnsafeNotify01,
Notify as Notify01,
NotifyHandle as NotifyHandle01,
spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01,
Spawn as Spawn01, UnsafeNotify as UnsafeNotify01,
};
use futures_01::{
Async as Async01, AsyncSink as AsyncSink01, Future as Future01,
Sink as Sink01, Stream as Stream01,
};
use futures_01::{Async as Async01, Future as Future01, Stream as Stream01};
use futures_core::{task as task03, Future as Future03, Stream as Stream03};
use std::pin::Pin;
use std::task::Waker;
use futures_sink::Sink as Sink03;

/// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
/// object to a futures 0.3-compatible version,
Expand Down Expand Up @@ -40,7 +42,10 @@ pub trait Future01CompatExt: Future01 {
/// [`Future<Item = T, Error = E>`](futures::future::Future)
/// into a futures 0.3
/// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
fn compat(self) -> Compat01As03<Self> where Self: Sized {
fn compat(self) -> Compat01As03<Self>
where
Self: Sized,
{
Compat01As03::new(self)
}
}
Expand All @@ -52,15 +57,31 @@ pub trait Stream01CompatExt: Stream01 {
/// [`Stream<Item = T, Error = E>`](futures::stream::Stream)
/// into a futures 0.3
/// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
fn compat(self) -> Compat01As03<Self> where Self: Sized {
fn compat(self) -> Compat01As03<Self>
where
Self: Sized,
{
Compat01As03::new(self)
}
}
impl<St: Stream01> Stream01CompatExt for St {}

fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>)
-> task03::Poll<Result<T, E>>
{
/// Extension trait for futures 0.1 [`Sink`](futures::sink::Sink)
pub trait Sink01CompatExt: Sink01 {
/// Converts a futures 0.1
/// [`Sink<SinkItem = T, SinkError = E>`](futures::sink::Sink)
/// into a futures 0.3
/// [`Sink<SinkItem = T, SinkError = E>`](futures_sink::sink::Sink).
fn sink_compat(self) -> Compat01As03Sink<Self, Self::SinkItem>
where
Self: Sized,
{
Compat01As03Sink::new(self)
}
}
impl<Si: Sink01> Sink01CompatExt for Si {}

fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
match x {
Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)),
Ok(Async01::NotReady) => task03::Poll::Pending,
Expand Down Expand Up @@ -95,6 +116,148 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
}
}

/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Compat01As03Sink<S, SinkItem> {
pub(crate) inner: Spawn01<S>,
pub(crate) buffer: Option<SinkItem>,
pub(crate) close_started: bool,
}

impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}

impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
/// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> {
Compat01As03Sink {
inner: spawn01(inner),
buffer: None,
close_started: false
}
}

fn in_notify<R>(
&mut self,
waker: &Waker,
f: impl FnOnce(&mut S) -> R,
) -> R {
let notify = &WakerToHandle(waker);
self.inner.poll_fn_notify(notify, 0, f)
}
}

impl<S, SinkItem> Stream03 for Compat01As03Sink<S, SinkItem>
where
S: Stream01,
{
type Item = Result<S::Item, S::Error>;

fn poll_next(
mut self: Pin<&mut Self>,
waker: &Waker,
) -> task03::Poll<Option<Self::Item>> {
match self.in_notify(waker, |f| f.poll()) {
Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))),
Ok(Async01::Ready(None)) => task03::Poll::Ready(None),
Ok(Async01::NotReady) => task03::Poll::Pending,
Err(e) => task03::Poll::Ready(Some(Err(e))),
}
}
}

impl<S, SinkItem> Sink03 for Compat01As03Sink<S, SinkItem>
where
S: Sink01<SinkItem = SinkItem>,
{
type SinkItem = SinkItem;
type SinkError = S::SinkError;

fn start_send(
mut self: Pin<&mut Self>,
item: Self::SinkItem,
) -> Result<(), Self::SinkError> {
debug_assert!(self.buffer.is_none());
self.buffer = Some(item);
Ok(())
}

fn poll_ready(
mut self: Pin<&mut Self>,
waker: &Waker,
) -> task03::Poll<Result<(), Self::SinkError>> {
match self.buffer.take() {
Some(item) => match self.in_notify(waker, |f| f.start_send(item)) {
Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())),
Ok(AsyncSink01::NotReady(i)) => {
self.buffer = Some(i);
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
},
None => task03::Poll::Ready(Ok(())),
}
}

fn poll_flush(
mut self: Pin<&mut Self>,
waker: &Waker,
) -> task03::Poll<Result<(), Self::SinkError>> {
let item = self.buffer.take();
match self.in_notify(waker, |f| match item {
Some(i) => match f.start_send(i) {
Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)),
Ok(AsyncSink01::NotReady(t)) => {
Ok((Async01::NotReady, Some(t)))
}
Err(e) => Err(e),
},
None => f.poll_complete().map(|i| (i, None)),
}) {
Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())),
Ok((Async01::NotReady, item)) => {
self.buffer = item;
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
}
}

fn poll_close(
mut self: Pin<&mut Self>,
waker: &Waker,
) -> task03::Poll<Result<(), Self::SinkError>> {
let item = self.buffer.take();
let close_started = self.close_started;

let result = self.in_notify(waker, |f| {
if !close_started {
if let Some(item) = item {
if let AsyncSink01::NotReady(item) = f.start_send(item)? {
return Ok((Async01::NotReady, Some(item), false));
}
}

if let Async01::NotReady = f.poll_complete()? {
return Ok((Async01::NotReady, None, false));
}
}

Ok((<S as Sink01>::close(f)?, None, true))
});

match result {
Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())),
Ok((Async01::NotReady, item, close_started)) => {
self.buffer = item;
self.close_started = close_started;
task03::Poll::Pending
}
Err(e) => task03::Poll::Ready(Err(e)),
}
}
}

struct NotifyWaker(task03::Waker);

#[derive(Clone)]
Expand Down Expand Up @@ -129,9 +292,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
mod io {
use super::*;
use futures_io::{
AsyncRead as AsyncRead03,
AsyncWrite as AsyncWrite03,
Initializer,
AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer,
};
use std::io::Error;
use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/compat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod executor;
pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03};

mod compat01as03;
pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt};
pub use self::compat01as03::{Compat01As03, Compat01As03Sink, Future01CompatExt, Stream01CompatExt, Sink01CompatExt};

mod compat03as01;
pub use self::compat03as01::Compat;
2 changes: 2 additions & 0 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@ pub mod compat {
pub use futures_util::compat::{
Compat,
Compat01As03,
Compat01As03Sink,
Executor01Future,
Executor01As03,
Executor01CompatExt,
Future01CompatExt,
Stream01CompatExt,
Sink01CompatExt,
};
}

Expand Down