From 59058291979bf09698e4e34a24559f721f0ac15a Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 5 Dec 2018 23:41:55 -0500 Subject: [PATCH 1/9] Add Sink01As03 compat shim This adds a compat shim to go from a 0.1 Sink to a 0.3 Sink. This follows, roughly how the stream compat works with the exception that it stores an internal buffer of one item. This is due to the fact that there is no `LocalWaker` passed to `Sink01::start_send`, so we can only actually write the item in the sink on the next `Sink03::poll_read` which gets a `LocalWaker`. --- futures-util/src/compat/compat01as03.rs | 92 ++++++++++++++++++++++++- futures-util/src/compat/mod.rs | 2 +- futures/src/lib.rs | 1 + 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index 850c12dbf5..17f7181fc8 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -4,10 +4,11 @@ use futures_01::executor::{ Notify as Notify01, NotifyHandle as NotifyHandle01, }; -use futures_01::{Async as Async01, Future as Future01, Stream as Stream01}; +use futures_01::{Async as Async01, Future as Future01, Stream as Stream01, Sink as Sink01, AsyncSink as AsyncSink01}; use futures_core::{task as task03, Future as Future03, Stream as Stream03}; use std::pin::Pin; use std::task::Waker; +use futures_sink::Sink as Sink03; /// Converts a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object to a futures 0.3-compatible version, @@ -58,6 +59,18 @@ pub trait Stream01CompatExt: Stream01 { } impl Stream01CompatExt for St {} +/// Extension trait for futures 0.1 [`Sink`](futures::sink::Sink) +pub trait Sink01CompatExt: Sink01 { + /// Converts a futures 0.1 + /// [`Sink`](futures::sink::Sink) + /// into a futures 0.3 + /// [`Sink`](futures_sink::sink::Sink). + fn compat(self) -> Compat01As03Sink where Self: Sized { + Compat01As03Sink::new(self) + } +} +impl Sink01CompatExt for Si {} + fn poll_01_to_03(x: Result, E>) -> task03::Poll> { @@ -95,6 +108,83 @@ impl Stream03 for Compat01As03 { } } +/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Compat01As03Sink { + pub(crate) inner: Spawn01, + pub(crate) buffer: Option, +} + +impl Unpin for Compat01As03Sink {} + +impl Compat01As03Sink { + /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. + pub fn new(sink: S) -> Compat01As03Sink { + Compat01As03Sink { + inner: spawn01(sink), + buffer: None, + } + } + + fn in_notify(&mut self, lw: &LocalWaker, f: impl FnOnce(&mut S) -> R) -> R { + let notify = &WakerToHandle(lw.as_waker()); + self.inner.poll_fn_notify(notify, 0, f) + } +} + +impl Sink03 for Compat01As03Sink { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(mut self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> { + self.buffer = Some(item); + Ok(()) + } + + fn poll_ready(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll>{ + match self.buffer.take() { + Some(item) => match self.in_notify(lw, |f| f.start_send(item)) { + Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())), + Ok(AsyncSink01::NotReady(i)) => { + self.buffer = Some(i); + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + }, + None => task03::Poll::Ready(Ok(())) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll> { + let item = self.buffer.take(); + match self.in_notify(lw, |f| { + match item { + Some(i) => match f.start_send(i) { + Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)), + Ok(AsyncSink01::NotReady(t)) => { + Ok((Async01::NotReady, Some(t))) + } + Err(e) => Err(e), + } + None => Ok((Async01::NotReady, None)) + } + }) { + Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), + Ok((Async01::NotReady, item)) => { + self.buffer = item; + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + } + } + + #[allow(unused_mut)] + fn poll_close(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll> { + self.poll_flush(lw) + } +} + struct NotifyWaker(task03::Waker); #[derive(Clone)] diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index d2f7f44c4a..d33ba13c18 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -6,7 +6,7 @@ mod executor; pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; -pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt}; +pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 90d2bc545b..630f1931b1 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -89,6 +89,7 @@ pub mod compat { Executor01CompatExt, Future01CompatExt, Stream01CompatExt, + Sink01CompatExt }; } From baf6c6b3786bbb4f7c29da4df4f1efac5acf9a98 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Sun, 9 Dec 2018 12:51:06 -0500 Subject: [PATCH 2/9] Add StreamSink01Compat and added debug_assert for start_send --- futures-util/src/compat/compat01as03.rs | 138 +++++++++++++++++------- futures-util/src/compat/mod.rs | 2 +- futures/src/lib.rs | 3 +- 3 files changed, 102 insertions(+), 41 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index 17f7181fc8..be427e8aa4 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -1,10 +1,11 @@ use futures_01::executor::{ - Spawn as Spawn01, spawn as spawn01, - UnsafeNotify as UnsafeNotify01, - Notify as Notify01, - NotifyHandle as NotifyHandle01, + spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, + Spawn as Spawn01, UnsafeNotify as UnsafeNotify01, +}; +use futures_01::{ + Async as Async01, AsyncSink as AsyncSink01, Future as Future01, + Sink as Sink01, Stream as Stream01, }; -use futures_01::{Async as Async01, Future as Future01, Stream as Stream01, Sink as Sink01, AsyncSink as AsyncSink01}; use futures_core::{task as task03, Future as Future03, Stream as Stream03}; use std::pin::Pin; use std::task::Waker; @@ -41,7 +42,10 @@ pub trait Future01CompatExt: Future01 { /// [`Future`](futures::future::Future) /// into a futures 0.3 /// [`Future>`](futures_core::future::Future). - fn compat(self) -> Compat01As03 where Self: Sized { + fn compat(self) -> Compat01As03 + where + Self: Sized, + { Compat01As03::new(self) } } @@ -53,7 +57,10 @@ pub trait Stream01CompatExt: Stream01 { /// [`Stream`](futures::stream::Stream) /// into a futures 0.3 /// [`Stream>`](futures_core::stream::Stream). - fn compat(self) -> Compat01As03 where Self: Sized { + fn compat(self) -> Compat01As03 + where + Self: Sized, + { Compat01As03::new(self) } } @@ -65,15 +72,34 @@ pub trait Sink01CompatExt: Sink01 { /// [`Sink`](futures::sink::Sink) /// into a futures 0.3 /// [`Sink`](futures_sink::sink::Sink). - fn compat(self) -> Compat01As03Sink where Self: Sized { + fn compat(self) -> Compat01As03Sink + where + Self: Sized, + { Compat01As03Sink::new(self) } } impl Sink01CompatExt for Si {} -fn poll_01_to_03(x: Result, E>) - -> task03::Poll> -{ +/// Extension trait for futures 0.1 [`Stream`](futures::stream::Stream) and +/// [`Sink`](futures::sink::Sink) combos +pub trait StreamSink01CompatExt: Stream01 + Sink01 { + /// Converts a futures 0.1 + /// [`Stream`](futures::sink::Sink) + /// into a futures 0.3 + /// [`Stream`](futures_core::stream::Stream) + + /// [`Sink`](futures_sink::sink::Sink). + fn compat(self) -> Compat01As03Sink::SinkItem> + where + Self: Sized, + { + Compat01As03Sink::new(self) + } +} +impl StreamSink01CompatExt for S {} + +fn poll_01_to_03(x: Result, E>) -> task03::Poll> { match x { Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)), Ok(Async01::NotReady) => task03::Poll::Pending, @@ -111,38 +137,71 @@ impl Stream03 for Compat01As03 { /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct Compat01As03Sink { +pub struct Compat01As03Sink { pub(crate) inner: Spawn01, - pub(crate) buffer: Option, + pub(crate) buffer: Option, } -impl Unpin for Compat01As03Sink {} +impl Unpin for Compat01As03Sink {} -impl Compat01As03Sink { +impl Compat01As03Sink { /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. - pub fn new(sink: S) -> Compat01As03Sink { + pub fn new(inner: S) -> Compat01As03Sink { Compat01As03Sink { - inner: spawn01(sink), + inner: spawn01(inner), buffer: None, } } - fn in_notify(&mut self, lw: &LocalWaker, f: impl FnOnce(&mut S) -> R) -> R { + fn in_notify( + &mut self, + lw: &LocalWaker, + f: impl FnOnce(&mut S) -> R, + ) -> R { let notify = &WakerToHandle(lw.as_waker()); self.inner.poll_fn_notify(notify, 0, f) } } -impl Sink03 for Compat01As03Sink { - type SinkItem = S::SinkItem; +impl Stream03 for Compat01As03Sink +where + S: Stream01, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + lw: &LocalWaker, + ) -> task03::Poll> { + match self.in_notify(lw, |f| f.poll()) { + Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))), + Ok(Async01::Ready(None)) => task03::Poll::Ready(None), + Ok(Async01::NotReady) => task03::Poll::Pending, + Err(e) => task03::Poll::Ready(Some(Err(e))), + } + } +} + +impl Sink03 for Compat01As03Sink +where + S: Sink01, +{ + type SinkItem = SinkItem; type SinkError = S::SinkError; - fn start_send(mut self: Pin<&mut Self>, item: Self::SinkItem) -> Result<(), Self::SinkError> { + fn start_send( + mut self: Pin<&mut Self>, + item: Self::SinkItem, + ) -> Result<(), Self::SinkError> { + debug_assert!(self.buffer.is_none()); self.buffer = Some(item); Ok(()) } - fn poll_ready(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll>{ + fn poll_ready( + mut self: Pin<&mut Self>, + lw: &LocalWaker, + ) -> task03::Poll> { match self.buffer.take() { Some(item) => match self.in_notify(lw, |f| f.start_send(item)) { Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())), @@ -152,23 +211,24 @@ impl Sink03 for Compat01As03Sink { } Err(e) => task03::Poll::Ready(Err(e)), }, - None => task03::Poll::Ready(Ok(())) + None => task03::Poll::Ready(Ok(())), } } - fn poll_flush(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + lw: &LocalWaker, + ) -> task03::Poll> { let item = self.buffer.take(); - match self.in_notify(lw, |f| { - match item { - Some(i) => match f.start_send(i) { - Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)), - Ok(AsyncSink01::NotReady(t)) => { - Ok((Async01::NotReady, Some(t))) - } - Err(e) => Err(e), + match self.in_notify(lw, |f| match item { + Some(i) => match f.start_send(i) { + Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)), + Ok(AsyncSink01::NotReady(t)) => { + Ok((Async01::NotReady, Some(t))) } - None => Ok((Async01::NotReady, None)) - } + Err(e) => Err(e), + }, + None => Ok((Async01::NotReady, None)), }) { Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), Ok((Async01::NotReady, item)) => { @@ -179,8 +239,10 @@ impl Sink03 for Compat01As03Sink { } } - #[allow(unused_mut)] - fn poll_close(mut self: Pin<&mut Self>, lw: &LocalWaker) -> task03::Poll> { + fn poll_close( + self: Pin<&mut Self>, + lw: &LocalWaker, + ) -> task03::Poll> { self.poll_flush(lw) } } @@ -219,9 +281,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker { mod io { use super::*; use futures_io::{ - AsyncRead as AsyncRead03, - AsyncWrite as AsyncWrite03, - Initializer, + AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer, }; use std::io::Error; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index d33ba13c18..339d573d6e 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -6,7 +6,7 @@ mod executor; pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; -pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; +pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt, StreamSink01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 630f1931b1..bf836b3c63 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -89,7 +89,8 @@ pub mod compat { Executor01CompatExt, Future01CompatExt, Stream01CompatExt, - Sink01CompatExt + Sink01CompatExt, + StreamSink01CompatExt, }; } From 5c3af5744aa918d618d8021be4b3a3b6bce1671d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Sun, 9 Dec 2018 15:18:57 -0500 Subject: [PATCH 3/9] Remove StreamSink01Compat and changed compat for sink to compat_sink --- futures-util/src/compat/compat01as03.rs | 20 +------------------- futures-util/src/compat/mod.rs | 2 +- futures/src/lib.rs | 1 - 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index be427e8aa4..b7f8af35fd 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -72,7 +72,7 @@ pub trait Sink01CompatExt: Sink01 { /// [`Sink`](futures::sink::Sink) /// into a futures 0.3 /// [`Sink`](futures_sink::sink::Sink). - fn compat(self) -> Compat01As03Sink + fn compat_sink(self) -> Compat01As03Sink where Self: Sized, { @@ -81,24 +81,6 @@ pub trait Sink01CompatExt: Sink01 { } impl Sink01CompatExt for Si {} -/// Extension trait for futures 0.1 [`Stream`](futures::stream::Stream) and -/// [`Sink`](futures::sink::Sink) combos -pub trait StreamSink01CompatExt: Stream01 + Sink01 { - /// Converts a futures 0.1 - /// [`Stream`](futures::sink::Sink) - /// into a futures 0.3 - /// [`Stream`](futures_core::stream::Stream) + - /// [`Sink`](futures_sink::sink::Sink). - fn compat(self) -> Compat01As03Sink::SinkItem> - where - Self: Sized, - { - Compat01As03Sink::new(self) - } -} -impl StreamSink01CompatExt for S {} - fn poll_01_to_03(x: Result, E>) -> task03::Poll> { match x { Ok(Async01::Ready(t)) => task03::Poll::Ready(Ok(t)), diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index 339d573d6e..d33ba13c18 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -6,7 +6,7 @@ mod executor; pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; -pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt, StreamSink01CompatExt}; +pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index bf836b3c63..c75647f9fa 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -90,7 +90,6 @@ pub mod compat { Future01CompatExt, Stream01CompatExt, Sink01CompatExt, - StreamSink01CompatExt, }; } From 951796f74284bd75a0c2f7e5f7478ccb225f082b Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 14 Dec 2018 12:43:50 -0500 Subject: [PATCH 4/9] Fix poll_close for Sink01As03 Compat --- futures-util/src/compat/compat01as03.rs | 43 +++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index b7f8af35fd..6d55277834 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -210,7 +210,7 @@ where } Err(e) => Err(e), }, - None => Ok((Async01::NotReady, None)), + None => f.poll_complete().map(|i| (i, None)), }) { Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), Ok((Async01::NotReady, item)) => { @@ -222,10 +222,47 @@ where } fn poll_close( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, lw: &LocalWaker, ) -> task03::Poll> { - self.poll_flush(lw) + let item = self.buffer.take(); + match self.in_notify(lw, |f| match item { + Some(i) => match f.start_send(i) { + Ok(AsyncSink01::Ready) => { + match f.poll_complete() { + Ok(Async01::Ready(_)) => { + match ::close(f) { + Ok(i) => Ok((i, None)), + Err(e) => Err(e) + } + }, + Ok(Async01::NotReady) => Ok((Async01::NotReady, None)), + Err(e) => Err(e) + } + }, + Ok(AsyncSink01::NotReady(t)) => { + Ok((Async01::NotReady, Some(t))) + } + Err(e) => Err(e), + }, + None => match f.poll_complete() { + Ok(Async01::Ready(_)) => { + match ::close(f) { + Ok(i) => Ok((i, None)), + Err(e) => Err(e) + } + }, + Ok(Async01::NotReady) => Ok((Async01::NotReady, None)), + Err(e) => Err(e) + }, + }) { + Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), + Ok((Async01::NotReady, item)) => { + self.buffer = item; + task03::Poll::Pending + } + Err(e) => task03::Poll::Ready(Err(e)), + } } } From ba839b7fe23de320b1b86a51535440068437b720 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 14 Dec 2018 13:36:37 -0500 Subject: [PATCH 5/9] Don't call poll_complete once Sink01::close has been called --- futures-util/src/compat/compat01as03.rs | 40 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index 6d55277834..a461ff6f23 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -122,6 +122,7 @@ impl Stream03 for Compat01As03 { pub struct Compat01As03Sink { pub(crate) inner: Spawn01, pub(crate) buffer: Option, + pub(crate) close_started: bool, } impl Unpin for Compat01As03Sink {} @@ -132,6 +133,7 @@ impl Compat01As03Sink { Compat01As03Sink { inner: spawn01(inner), buffer: None, + close_started: false } } @@ -226,39 +228,49 @@ where lw: &LocalWaker, ) -> task03::Poll> { let item = self.buffer.take(); + let close_started = self.close_started; + match self.in_notify(lw, |f| match item { Some(i) => match f.start_send(i) { Ok(AsyncSink01::Ready) => { match f.poll_complete() { Ok(Async01::Ready(_)) => { match ::close(f) { - Ok(i) => Ok((i, None)), + Ok(i) => Ok((i, None, true)), Err(e) => Err(e) } }, - Ok(Async01::NotReady) => Ok((Async01::NotReady, None)), + Ok(Async01::NotReady) => Ok((Async01::NotReady, None, false)), Err(e) => Err(e) } }, Ok(AsyncSink01::NotReady(t)) => { - Ok((Async01::NotReady, Some(t))) + Ok((Async01::NotReady, Some(t), close_started)) } Err(e) => Err(e), }, - None => match f.poll_complete() { - Ok(Async01::Ready(_)) => { - match ::close(f) { - Ok(i) => Ok((i, None)), - Err(e) => Err(e) - } - }, - Ok(Async01::NotReady) => Ok((Async01::NotReady, None)), - Err(e) => Err(e) + None => if close_started { + match ::close(f) { + Ok(i) => Ok((i, None, true)), + Err(e) => Err(e) + } + } else { + match f.poll_complete() { + Ok(Async01::Ready(_)) => { + match ::close(f) { + Ok(i) => Ok((i, None, true)), + Err(e) => Err(e) + } + }, + Ok(Async01::NotReady) => Ok((Async01::NotReady, None, close_started)), + Err(e) => Err(e) + } }, }) { - Ok((Async01::Ready(_), _)) => task03::Poll::Ready(Ok(())), - Ok((Async01::NotReady, item)) => { + Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())), + Ok((Async01::NotReady, item, close_started)) => { self.buffer = item; + self.close_started = close_started; task03::Poll::Pending } Err(e) => task03::Poll::Ready(Err(e)), From 6d43f7ae7a015913e452f0fbe5f34888149e3348 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 1 Feb 2019 20:04:41 -0500 Subject: [PATCH 6/9] Clean up poll_close for SinkCompat layer --- futures-util/src/compat/compat01as03.rs | 50 ++++++++----------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index a461ff6f23..e2601573de 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -118,7 +118,7 @@ impl Stream03 for Compat01As03 { /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version #[derive(Debug)] -#[must_use = "futures do nothing unless polled"] +#[must_use = "sinks do nothing unless polled"] pub struct Compat01As03Sink { pub(crate) inner: Spawn01, pub(crate) buffer: Option, @@ -230,43 +230,23 @@ where let item = self.buffer.take(); let close_started = self.close_started; - match self.in_notify(lw, |f| match item { - Some(i) => match f.start_send(i) { - Ok(AsyncSink01::Ready) => { - match f.poll_complete() { - Ok(Async01::Ready(_)) => { - match ::close(f) { - Ok(i) => Ok((i, None, true)), - Err(e) => Err(e) - } - }, - Ok(Async01::NotReady) => Ok((Async01::NotReady, None, false)), - Err(e) => Err(e) + let result = self.in_notify(lw, |f| { + if !close_started { + if let Some(item) = item { + if let AsyncSink01::NotReady(item) = f.start_send(item)? { + return Ok((Async01::NotReady, Some(item), false)); } - }, - Ok(AsyncSink01::NotReady(t)) => { - Ok((Async01::NotReady, Some(t), close_started)) - } - Err(e) => Err(e), - }, - None => if close_started { - match ::close(f) { - Ok(i) => Ok((i, None, true)), - Err(e) => Err(e) } - } else { - match f.poll_complete() { - Ok(Async01::Ready(_)) => { - match ::close(f) { - Ok(i) => Ok((i, None, true)), - Err(e) => Err(e) - } - }, - Ok(Async01::NotReady) => Ok((Async01::NotReady, None, close_started)), - Err(e) => Err(e) + + if let Async01::NotReady = f.poll_complete()? { + return Ok((Async01::NotReady, None, false)); } - }, - }) { + } + + Ok((::close(f)?, None, true)) + }); + + match result { Ok((Async01::Ready(_), _, _)) => task03::Poll::Ready(Ok(())), Ok((Async01::NotReady, item, close_started)) => { self.buffer = item; From 658d1b8be905427750c552680d4b86b3fb3e2a88 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Sun, 3 Feb 2019 12:23:35 -0500 Subject: [PATCH 7/9] Rename compat_sink to sink_compat --- futures-util/src/compat/compat01as03.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index e2601573de..f0afa52e13 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -72,7 +72,7 @@ pub trait Sink01CompatExt: Sink01 { /// [`Sink`](futures::sink::Sink) /// into a futures 0.3 /// [`Sink`](futures_sink::sink::Sink). - fn compat_sink(self) -> Compat01As03Sink + fn sink_compat(self) -> Compat01As03Sink where Self: Sized, { From 9c12c96fd0285ee53add6ed7aef52c1762f67322 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 8 Feb 2019 12:52:39 -0500 Subject: [PATCH 8/9] Add pub export of Compat01As03Sink --- futures-util/src/compat/mod.rs | 2 +- futures/src/lib.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/futures-util/src/compat/mod.rs b/futures-util/src/compat/mod.rs index d33ba13c18..517e5d69cf 100644 --- a/futures-util/src/compat/mod.rs +++ b/futures-util/src/compat/mod.rs @@ -6,7 +6,7 @@ mod executor; pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; -pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; +pub use self::compat01as03::{Compat01As03, Compat01As03Sink, Future01CompatExt, Stream01CompatExt, Sink01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c75647f9fa..42c1228b4d 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -84,6 +84,7 @@ pub mod compat { pub use futures_util::compat::{ Compat, Compat01As03, + Compat01As03Sink, Executor01Future, Executor01As03, Executor01CompatExt, From a70d0df0751873f56537ade3bff93d2f8c04120c Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 28 Feb 2019 17:03:18 -0500 Subject: [PATCH 9/9] Fix last remaining localwaker references --- futures-util/src/compat/compat01as03.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index f0afa52e13..6a8eec957e 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -139,10 +139,10 @@ impl Compat01As03Sink { fn in_notify( &mut self, - lw: &LocalWaker, + waker: &Waker, f: impl FnOnce(&mut S) -> R, ) -> R { - let notify = &WakerToHandle(lw.as_waker()); + let notify = &WakerToHandle(waker); self.inner.poll_fn_notify(notify, 0, f) } } @@ -155,9 +155,9 @@ where fn poll_next( mut self: Pin<&mut Self>, - lw: &LocalWaker, + waker: &Waker, ) -> task03::Poll> { - match self.in_notify(lw, |f| f.poll()) { + match self.in_notify(waker, |f| f.poll()) { Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))), Ok(Async01::Ready(None)) => task03::Poll::Ready(None), Ok(Async01::NotReady) => task03::Poll::Pending, @@ -184,10 +184,10 @@ where fn poll_ready( mut self: Pin<&mut Self>, - lw: &LocalWaker, + waker: &Waker, ) -> task03::Poll> { match self.buffer.take() { - Some(item) => match self.in_notify(lw, |f| f.start_send(item)) { + Some(item) => match self.in_notify(waker, |f| f.start_send(item)) { Ok(AsyncSink01::Ready) => task03::Poll::Ready(Ok(())), Ok(AsyncSink01::NotReady(i)) => { self.buffer = Some(i); @@ -201,10 +201,10 @@ where fn poll_flush( mut self: Pin<&mut Self>, - lw: &LocalWaker, + waker: &Waker, ) -> task03::Poll> { let item = self.buffer.take(); - match self.in_notify(lw, |f| match item { + match self.in_notify(waker, |f| match item { Some(i) => match f.start_send(i) { Ok(AsyncSink01::Ready) => f.poll_complete().map(|i| (i, None)), Ok(AsyncSink01::NotReady(t)) => { @@ -225,12 +225,12 @@ where fn poll_close( mut self: Pin<&mut Self>, - lw: &LocalWaker, + waker: &Waker, ) -> task03::Poll> { let item = self.buffer.take(); let close_started = self.close_started; - let result = self.in_notify(lw, |f| { + let result = self.in_notify(waker, |f| { if !close_started { if let Some(item) = item { if let AsyncSink01::NotReady(item) = f.start_send(item)? {