From d48040837ed04af828311fde5e4d9e8ba5bdcf7b Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 14 Jun 2019 20:08:10 +0200 Subject: [PATCH 1/4] Add AsyncBufReadExt::copy_buf_into --- futures-util/src/io/copy_buf_into.rs | 58 ++++++++++++++++++++++++++++ futures-util/src/io/mod.rs | 37 ++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 futures-util/src/io/copy_buf_into.rs diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs new file mode 100644 index 0000000000..0ceaec8e0e --- /dev/null +++ b/futures-util/src/io/copy_buf_into.rs @@ -0,0 +1,58 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use std::io; +use std::pin::Pin; + +/// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CopyBufInto { + reader: R, + writer: W, + amt: u64, +} + +impl Unpin for CopyBufInto {} + +impl CopyBufInto { + pub(super) fn new(reader: R, writer: W) -> Self { + CopyBufInto { + reader, + writer, + amt: 0, + } + } + + fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, Pin<&'a mut W>, &'a mut u64) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.reader), Pin::new_unchecked(&mut this.writer), &mut this.amt) + } + } +} + +impl Future for CopyBufInto + where R: AsyncBufRead, + W: AsyncWrite, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut reader, mut writer, amt) = self.project(); + loop { + let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(writer.as_mut().poll_flush(cx))?; + return Poll::Ready(Ok(*amt)); + } + + let i = ready!(writer.as_mut().poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + *amt += i as u64; + reader.as_mut().consume(i); + } + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 1840df75fe..0fe4df508a 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -31,6 +31,9 @@ pub use self::buf_writer::BufWriter; mod copy_into; pub use self::copy_into::CopyInto; +mod copy_buf_into; +pub use self::copy_buf_into::CopyBufInto; + mod flush; pub use self::flush::Flush; @@ -441,6 +444,40 @@ impl AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which copies all the bytes from one object to another. + /// + /// The returned future will copy all the bytes read from this `AsyncBufRead` into the + /// `writer` specified. This future will only complete once the `reader` has hit + /// EOF and all bytes have been written to and flushed from the `writer` + /// provided. + /// + /// On success the number of bytes is returned. + /// + /// Note that this method consumes `writer` but does not close it, you will likely want to pass + /// it by reference as shown in the example. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncBufReadExt, AsyncWriteExt}; + /// use std::io::Cursor; + /// + /// let reader = Cursor::new([1, 2, 3, 4]); + /// let mut writer = Cursor::new([0u8; 5]); + /// + /// let bytes = reader.copy_buf_into(&mut writer).await?; + /// writer.close().await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn copy_buf_into(self, writer: W) -> CopyBufInto where Self: Sized { + CopyBufInto::new(self, writer) + } + /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). From 80021904e159c24d92b32227f8fa49d13c800058 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 14 Jun 2019 20:39:40 +0200 Subject: [PATCH 2/4] Redefine AsyncReadExt::copy_into on top of AsyncBufReadExt::copy_buf_into --- futures-util/src/io/copy_into.rs | 69 ++++++-------------------------- futures-util/src/io/mod.rs | 19 +++++---- 2 files changed, 22 insertions(+), 66 deletions(-) diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index 629a7783c3..a14460919d 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -3,75 +3,32 @@ use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite}; use std::io; use std::pin::Pin; +use super::{BufReader, CopyBufInto}; +use pin_utils::unsafe_pinned; /// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyInto<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> { - reader: &'a mut R, - read_done: bool, - writer: &'a mut W, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, +pub struct CopyInto { + inner: CopyBufInto, W>, } -impl Unpin for CopyInto<'_, R, W> {} +impl Unpin for CopyInto where CopyBufInto, W>: Unpin {} -impl<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> CopyInto<'a, R, W> { - pub(super) fn new(reader: &'a mut R, writer: &'a mut W) -> Self { +impl CopyInto { + unsafe_pinned!(inner: CopyBufInto, W>); + + pub(super) fn new(reader: R, writer: W) -> Self { CopyInto { - reader, - read_done: false, - writer, - amt: 0, - pos: 0, - cap: 0, - buf: Box::new([0; 2048]), + inner: CopyBufInto::new(BufReader::new(reader), writer), } } } -impl Future for CopyInto<'_, R, W> - where R: AsyncRead + ?Sized + Unpin, - W: AsyncWrite + ?Sized + Unpin, -{ +impl Future for CopyInto { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - // If our buffer is empty, then we need to read some data to - // continue. - if this.pos == this.cap && !this.read_done { - let n = ready!(Pin::new(&mut this.reader).poll_read(cx, &mut this.buf))?; - if n == 0 { - this.read_done = true; - } else { - this.pos = 0; - this.cap = n; - } - } - - // If our buffer has some data, let's write it out! - while this.pos < this.cap { - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf[this.pos..this.cap]))?; - if i == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } else { - this.pos += i; - this.amt += i as u64; - } - } - - // If we've written al the data and we've seen EOF, flush out the - // data and finish the transfer. - // done with the entire transfer. - if this.pos == this.cap && this.read_done { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner().poll(cx) } } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 0fe4df508a..c1b95d5d4e 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -93,29 +93,28 @@ pub trait AsyncReadExt: AsyncRead { /// /// On success the number of bytes is returned. /// + /// Note that this method consumes `writer` but does not close it, you will likely want to pass + /// it by reference as shown in the example. + /// /// # Examples /// /// ``` /// #![feature(async_await)] /// # futures::executor::block_on(async { - /// use futures::io::AsyncReadExt; + /// use futures::io::{AsyncReadExt, AsyncWriteExt}; /// use std::io::Cursor; /// - /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let reader = Cursor::new([1, 2, 3, 4]); /// let mut writer = Cursor::new([0u8; 5]); /// /// let bytes = reader.copy_into(&mut writer).await?; + /// writer.close().await?; /// /// assert_eq!(bytes, 4); /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_into<'a, W>( - &'a mut self, - writer: &'a mut W, - ) -> CopyInto<'a, Self, W> - where Self: Unpin, W: AsyncWrite + Unpin, - { + fn copy_into(self, writer: W) -> CopyInto where Self: Sized { CopyInto::new(self, writer) } @@ -256,12 +255,12 @@ pub trait AsyncReadExt: AsyncRead { /// // seek position. This may or may not be true for other types that /// // implement both `AsyncRead` and `AsyncWrite`. /// - /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let reader = Cursor::new([1, 2, 3, 4]); /// let mut buffer = Cursor::new([0, 0, 0, 0, 5, 6, 7, 8]); /// let mut writer = Cursor::new([0u8; 5]); /// /// { - /// let (mut buffer_reader, mut buffer_writer) = (&mut buffer).split(); + /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split(); /// reader.copy_into(&mut buffer_writer).await?; /// buffer_reader.copy_into(&mut writer).await?; /// } From 8e6c53a2deae9cb7cc802429b1e12a9b2f51044a Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Tue, 25 Jun 2019 19:46:37 +0200 Subject: [PATCH 3/4] Change copy_into/copy_buf_into back to taking writer by reference --- futures-util/src/io/copy_buf_into.rs | 20 +++++++++++--------- futures-util/src/io/copy_into.rs | 14 +++++++------- futures-util/src/io/mod.rs | 10 ++-------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs index 0ceaec8e0e..e9bd2baa35 100644 --- a/futures-util/src/io/copy_buf_into.rs +++ b/futures-util/src/io/copy_buf_into.rs @@ -7,34 +7,36 @@ use std::pin::Pin; /// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyBufInto { +pub struct CopyBufInto<'a, R, W> { reader: R, - writer: W, + writer: &'a mut W, amt: u64, } -impl Unpin for CopyBufInto {} +impl Unpin for CopyBufInto<'_, R, W> {} -impl CopyBufInto { - pub(super) fn new(reader: R, writer: W) -> Self { +impl CopyBufInto<'_, R, W> { + pub(super) fn new(reader: R, writer: &mut W) -> CopyBufInto<'_, R, W> { CopyBufInto { reader, writer, amt: 0, } } +} - fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, Pin<&'a mut W>, &'a mut u64) { +impl CopyBufInto<'_, R, W> { + fn project<'b>(self: Pin<&'b mut Self>) -> (Pin<&'b mut R>, Pin<&'b mut W>, &'b mut u64) { unsafe { let this = self.get_unchecked_mut(); - (Pin::new_unchecked(&mut this.reader), Pin::new_unchecked(&mut this.writer), &mut this.amt) + (Pin::new_unchecked(&mut this.reader), Pin::new(&mut *this.writer), &mut this.amt) } } } -impl Future for CopyBufInto +impl Future for CopyBufInto<'_, R, W> where R: AsyncBufRead, - W: AsyncWrite, + W: AsyncWrite + Unpin, { type Output = io::Result; diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index a14460919d..df13706b5a 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -9,23 +9,23 @@ use pin_utils::unsafe_pinned; /// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyInto { - inner: CopyBufInto, W>, +pub struct CopyInto<'a, R: AsyncRead, W> { + inner: CopyBufInto<'a, BufReader, W>, } -impl Unpin for CopyInto where CopyBufInto, W>: Unpin {} +impl<'a, R: AsyncRead, W> Unpin for CopyInto<'a, R, W> where CopyBufInto<'a, BufReader, W>: Unpin {} -impl CopyInto { - unsafe_pinned!(inner: CopyBufInto, W>); +impl<'a, R: AsyncRead, W> CopyInto<'a, R, W> { + unsafe_pinned!(inner: CopyBufInto<'a, BufReader, W>); - pub(super) fn new(reader: R, writer: W) -> Self { + pub(super) fn new(reader: R, writer: &mut W) -> CopyInto<'_, R, W> { CopyInto { inner: CopyBufInto::new(BufReader::new(reader), writer), } } } -impl Future for CopyInto { +impl Future for CopyInto<'_, R, W> { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index c1b95d5d4e..5a6701bb66 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -93,9 +93,6 @@ pub trait AsyncReadExt: AsyncRead { /// /// On success the number of bytes is returned. /// - /// Note that this method consumes `writer` but does not close it, you will likely want to pass - /// it by reference as shown in the example. - /// /// # Examples /// /// ``` @@ -114,7 +111,7 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_into(self, writer: W) -> CopyInto where Self: Sized { + fn copy_into(self, writer: &mut W) -> CopyInto<'_, Self, W> where Self: Sized { CopyInto::new(self, writer) } @@ -452,9 +449,6 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// /// On success the number of bytes is returned. /// - /// Note that this method consumes `writer` but does not close it, you will likely want to pass - /// it by reference as shown in the example. - /// /// # Examples /// /// ``` @@ -473,7 +467,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_buf_into(self, writer: W) -> CopyBufInto where Self: Sized { + fn copy_buf_into(self, writer: &mut W) -> CopyBufInto<'_, Self, W> where Self: Sized { CopyBufInto::new(self, writer) } From 63d560338613cfc5b925c97d5ef8f42515c2cf94 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Tue, 25 Jun 2019 19:55:15 +0200 Subject: [PATCH 4/4] Allow unsized writer for copy_{buf_}into --- futures-util/src/io/copy_buf_into.rs | 10 +++++----- futures-util/src/io/copy_into.rs | 8 ++++---- futures-util/src/io/mod.rs | 12 ++++++++++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs index e9bd2baa35..37b122a8d9 100644 --- a/futures-util/src/io/copy_buf_into.rs +++ b/futures-util/src/io/copy_buf_into.rs @@ -7,15 +7,15 @@ use std::pin::Pin; /// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyBufInto<'a, R, W> { +pub struct CopyBufInto<'a, R, W: ?Sized> { reader: R, writer: &'a mut W, amt: u64, } -impl Unpin for CopyBufInto<'_, R, W> {} +impl Unpin for CopyBufInto<'_, R, W> {} -impl CopyBufInto<'_, R, W> { +impl CopyBufInto<'_, R, W> { pub(super) fn new(reader: R, writer: &mut W) -> CopyBufInto<'_, R, W> { CopyBufInto { reader, @@ -25,7 +25,7 @@ impl CopyBufInto<'_, R, W> { } } -impl CopyBufInto<'_, R, W> { +impl CopyBufInto<'_, R, W> { fn project<'b>(self: Pin<&'b mut Self>) -> (Pin<&'b mut R>, Pin<&'b mut W>, &'b mut u64) { unsafe { let this = self.get_unchecked_mut(); @@ -36,7 +36,7 @@ impl CopyBufInto<'_, R, W> { impl Future for CopyBufInto<'_, R, W> where R: AsyncBufRead, - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + ?Sized, { type Output = io::Result; diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index df13706b5a..198386da49 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -9,13 +9,13 @@ use pin_utils::unsafe_pinned; /// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyInto<'a, R: AsyncRead, W> { +pub struct CopyInto<'a, R: AsyncRead, W: ?Sized> { inner: CopyBufInto<'a, BufReader, W>, } -impl<'a, R: AsyncRead, W> Unpin for CopyInto<'a, R, W> where CopyBufInto<'a, BufReader, W>: Unpin {} +impl<'a, R: AsyncRead, W: ?Sized> Unpin for CopyInto<'a, R, W> where CopyBufInto<'a, BufReader, W>: Unpin {} -impl<'a, R: AsyncRead, W> CopyInto<'a, R, W> { +impl<'a, R: AsyncRead, W: ?Sized> CopyInto<'a, R, W> { unsafe_pinned!(inner: CopyBufInto<'a, BufReader, W>); pub(super) fn new(reader: R, writer: &mut W) -> CopyInto<'_, R, W> { @@ -25,7 +25,7 @@ impl<'a, R: AsyncRead, W> CopyInto<'a, R, W> { } } -impl Future for CopyInto<'_, R, W> { +impl Future for CopyInto<'_, R, W> { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 5a6701bb66..ac9964725d 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -111,7 +111,11 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_into(self, writer: &mut W) -> CopyInto<'_, Self, W> where Self: Sized { + fn copy_into(self, writer: &mut W) -> CopyInto<'_, Self, W> + where + Self: Sized, + W: AsyncWrite + Unpin + ?Sized, + { CopyInto::new(self, writer) } @@ -467,7 +471,11 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_buf_into(self, writer: &mut W) -> CopyBufInto<'_, Self, W> where Self: Sized { + fn copy_buf_into(self, writer: &mut W) -> CopyBufInto<'_, Self, W> + where + Self: Sized, + W: AsyncWrite + Unpin + ?Sized, + { CopyBufInto::new(self, writer) }