diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index 850c12dbf5..6a8eec957e 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -1,13 +1,15 @@ use futures_01::executor::{ - Spawn as Spawn01, spawn as spawn01, - UnsafeNotify as UnsafeNotify01, - Notify as Notify01, - NotifyHandle as NotifyHandle01, + spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, + Spawn as Spawn01, UnsafeNotify as UnsafeNotify01, +}; +use futures_01::{ + Async as Async01, AsyncSink as AsyncSink01, Future as Future01, + Sink as Sink01, Stream as Stream01, }; -use futures_01::{Async as Async01, Future as Future01, Stream as Stream01}; use futures_core::{task as task03, Future as Future03, Stream as Stream03}; use std::pin::Pin; use std::task::Waker; +use futures_sink::Sink as Sink03; /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object to a futures 0.3-compatible version, @@ -40,7 +42,10 @@ pub trait Future01CompatExt: Future01 { /// [`Future`](futures::future::Future) /// into a futures 0.3 /// [`Future>`](futures_core::future::Future). - fn compat(self) -> Compat01As03 where Self: Sized { + fn compat(self) -> Compat01As03 + where + Self: Sized, + { Compat01As03::new(self) } } @@ -52,15 +57,31 @@ pub trait Stream01CompatExt: Stream01 { /// [`Stream`](futures::stream::Stream) /// into a futures 0.3 /// [`Stream>`](futures_core::stream::Stream). - fn compat(self) -> Compat01As03 where Self: Sized { + fn compat(self) -> Compat01As03 + where + Self: Sized, + { Compat01As03::new(self) } } impl Stream01CompatExt for St {} -fn poll_01_to_03(x: Result, E>) - -> task03::Poll> -{ +/// Extension trait for futures 0.1 [`Sink`](futures::sink::Sink) +pub trait Sink01CompatExt: Sink01 { + /// Converts a futures 0.1 + /// [`Sink`](futures::sink::Sink) + /// into a futures 0.3 + /// [`Sink`](futures_sink::sink::Sink). + fn sink_compat(self) -> Compat01As03Sink + where + Self: Sized, + { + Compat01As03Sink::new(self) + } +} +impl Sink01CompatExt for Si {} + +fn poll_01_to_03(x: Result, E>) -> task03::Poll> { match x { Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)), Ok(Async01::NotReady) => task03::Poll::Pending, @@ -95,6 +116,148 @@ impl Stream03 for Compat01As03 { } } +/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Compat01As03Sink { + pub(crate) inner: Spawn01, + pub(crate) buffer: Option, + pub(crate) close_started: bool, +} + +impl Unpin for Compat01As03Sink {} + +impl Compat01As03Sink { + /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. + pub fn new(inner: S) -> Compat01As03Sink { + Compat01As03Sink { + inner: spawn01(inner), + buffer: None, + close_started: false + } + } + + fn in_notify( + &mut self, + waker: &Waker, + f: impl FnOnce(&mut S) -> R, + ) -> R { + let notify = &WakerToHandle(waker); + self.inner.poll_fn_notify(notify, 0, f) + } +} + +impl Stream03 for Compat01As03Sink +where + S: Stream01, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + waker: &Waker, + ) -> task03::Poll> { + match self.in_notify(waker, |f| f.poll()) { + Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))), + Ok(Async01::Ready(None)) => task03::Poll::Ready(None), + Ok(Async01::NotReady) => task03::Poll::Pending, + Err(e) => task03::Poll::Ready(Some(Err(e))), + } + } +} + +impl Sink03 for Compat01As03Sink +where + S: Sink01, +{ + type SinkItem = SinkItem; + type SinkError = S::SinkError; + + fn start_send( + mut self: Pin<&mut Self>, + item: Self::SinkItem, + ) -> Result<(), Self::SinkError> { + debug_assert!(self.buffer.is_none()); + self.buffer = Some(item); + Ok(()) + } + + fn poll_ready( + mut self: Pin<&mut Self>, + waker: &Waker, + ) -> task03::Poll> { + match self.buffer.take() { + Some(item) => match self.in_notify(waker, |f| f.start_send(item)) { + Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())), + Ok(AsyncSink01::NotReady(i)) => { + self.buffer = Some(i); + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + }, + None => task03::Poll::Ready(Ok(())), + } + } + + fn poll_flush( + mut self: Pin<&mut Self>, + waker: &Waker, + ) -> task03::Poll> { + let item = self.buffer.take(); + match self.in_notify(waker, |f| match item { + Some(i) => match f.start_send(i) { + Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)), + Ok(AsyncSink01::NotReady(t)) => { + Ok((Async01::NotReady, Some(t))) + } + Err(e) => Err(e), + }, + None => f.poll_complete().map(|i| (i, None)), + }) { + Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), + Ok((Async01::NotReady, item)) => { + self.buffer = item; + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + } + } + + fn poll_close( + mut self: Pin<&mut Self>, + waker: &Waker, + ) -> task03::Poll> { + let item = self.buffer.take(); + let close_started = self.close_started; + + let result = self.in_notify(waker, |f| { + if !close_started { + if let Some(item) = item { + if let AsyncSink01::NotReady(item) = f.start_send(item)? { + return Ok((Async01::NotReady, Some(item), false)); + } + } + + if let Async01::NotReady = f.poll_complete()? { + return Ok((Async01::NotReady, None, false)); + } + } + + Ok((::close(f)?, None, true)) + }); + + match result { + Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())), + Ok((Async01::NotReady, item, close_started)) => { + self.buffer = item; + self.close_started = close_started; + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + } + } +} + struct NotifyWaker(task03::Waker); #[derive(Clone)] @@ -129,9 +292,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker { mod io { use super::*; use futures_io::{ - AsyncRead as AsyncRead03, - AsyncWrite as AsyncWrite03, - Initializer, + AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer, }; use std::io::Error; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index d2f7f44c4a..517e5d69cf 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -6,7 +6,7 @@ mod executor; pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; -pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt}; +pub use self::compat01as03::{Compat01As03, Compat01As03Sink, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 90d2bc545b..42c1228b4d 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -84,11 +84,13 @@ pub mod compat { pub use futures_util::compat::{ Compat, Compat01As03, + Compat01As03Sink, Executor01Future, Executor01As03, Executor01CompatExt, Future01CompatExt, Stream01CompatExt, + Sink01CompatExt, }; }