diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs new file mode 100644 index 0000000000..37b122a8d9 --- /dev/null +++ b/futures-util/src/io/copy_buf_into.rs @@ -0,0 +1,60 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use std::io; +use std::pin::Pin; + +/// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CopyBufInto<'a, R, W: ?Sized> { + reader: R, + writer: &'a mut W, + amt: u64, +} + +impl Unpin for CopyBufInto<'_, R, W> {} + +impl CopyBufInto<'_, R, W> { + pub(super) fn new(reader: R, writer: &mut W) -> CopyBufInto<'_, R, W> { + CopyBufInto { + reader, + writer, + amt: 0, + } + } +} + +impl CopyBufInto<'_, R, W> { + fn project<'b>(self: Pin<&'b mut Self>) -> (Pin<&'b mut R>, Pin<&'b mut W>, &'b mut u64) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.reader), Pin::new(&mut *this.writer), &mut this.amt) + } + } +} + +impl Future for CopyBufInto<'_, R, W> + where R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut reader, mut writer, amt) = self.project(); + loop { + let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(writer.as_mut().poll_flush(cx))?; + return Poll::Ready(Ok(*amt)); + } + + let i = ready!(writer.as_mut().poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + *amt += i as u64; + reader.as_mut().consume(i); + } + } +} diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index 629a7783c3..198386da49 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -3,75 +3,32 @@ use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite}; use std::io; use std::pin::Pin; +use super::{BufReader, CopyBufInto}; +use pin_utils::unsafe_pinned; /// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyInto<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> { - reader: &'a mut R, - read_done: bool, - writer: &'a mut W, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, +pub struct CopyInto<'a, R: AsyncRead, W: ?Sized> { + inner: CopyBufInto<'a, BufReader, W>, } -impl Unpin for CopyInto<'_, R, W> {} +impl<'a, R: AsyncRead, W: ?Sized> Unpin for CopyInto<'a, R, W> where CopyBufInto<'a, BufReader, W>: Unpin {} -impl<'a, R: ?Sized + Unpin, W: ?Sized + Unpin> CopyInto<'a, R, W> { - pub(super) fn new(reader: &'a mut R, writer: &'a mut W) -> Self { +impl<'a, R: AsyncRead, W: ?Sized> CopyInto<'a, R, W> { + unsafe_pinned!(inner: CopyBufInto<'a, BufReader, W>); + + pub(super) fn new(reader: R, writer: &mut W) -> CopyInto<'_, R, W> { CopyInto { - reader, - read_done: false, - writer, - amt: 0, - pos: 0, - cap: 0, - buf: Box::new([0; 2048]), + inner: CopyBufInto::new(BufReader::new(reader), writer), } } } -impl Future for CopyInto<'_, R, W> - where R: AsyncRead + ?Sized + Unpin, - W: AsyncWrite + ?Sized + Unpin, -{ +impl Future for CopyInto<'_, R, W> { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - // If our buffer is empty, then we need to read some data to - // continue. - if this.pos == this.cap && !this.read_done { - let n = ready!(Pin::new(&mut this.reader).poll_read(cx, &mut this.buf))?; - if n == 0 { - this.read_done = true; - } else { - this.pos = 0; - this.cap = n; - } - } - - // If our buffer has some data, let's write it out! - while this.pos < this.cap { - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf[this.pos..this.cap]))?; - if i == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } else { - this.pos += i; - this.amt += i as u64; - } - } - - // If we've written al the data and we've seen EOF, flush out the - // data and finish the transfer. - // done with the entire transfer. - if this.pos == this.cap && this.read_done { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner().poll(cx) } } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 1840df75fe..ac9964725d 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -31,6 +31,9 @@ pub use self::buf_writer::BufWriter; mod copy_into; pub use self::copy_into::CopyInto; +mod copy_buf_into; +pub use self::copy_buf_into::CopyBufInto; + mod flush; pub use self::flush::Flush; @@ -95,23 +98,23 @@ pub trait AsyncReadExt: AsyncRead { /// ``` /// #![feature(async_await)] /// # futures::executor::block_on(async { - /// use futures::io::AsyncReadExt; + /// use futures::io::{AsyncReadExt, AsyncWriteExt}; /// use std::io::Cursor; /// - /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let reader = Cursor::new([1, 2, 3, 4]); /// let mut writer = Cursor::new([0u8; 5]); /// /// let bytes = reader.copy_into(&mut writer).await?; + /// writer.close().await?; /// /// assert_eq!(bytes, 4); /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` - fn copy_into<'a, W>( - &'a mut self, - writer: &'a mut W, - ) -> CopyInto<'a, Self, W> - where Self: Unpin, W: AsyncWrite + Unpin, + fn copy_into(self, writer: &mut W) -> CopyInto<'_, Self, W> + where + Self: Sized, + W: AsyncWrite + Unpin + ?Sized, { CopyInto::new(self, writer) } @@ -253,12 +256,12 @@ pub trait AsyncReadExt: AsyncRead { /// // seek position. This may or may not be true for other types that /// // implement both `AsyncRead` and `AsyncWrite`. /// - /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let reader = Cursor::new([1, 2, 3, 4]); /// let mut buffer = Cursor::new([0, 0, 0, 0, 5, 6, 7, 8]); /// let mut writer = Cursor::new([0u8; 5]); /// /// { - /// let (mut buffer_reader, mut buffer_writer) = (&mut buffer).split(); + /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split(); /// reader.copy_into(&mut buffer_writer).await?; /// buffer_reader.copy_into(&mut writer).await?; /// } @@ -441,6 +444,41 @@ impl AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which copies all the bytes from one object to another. + /// + /// The returned future will copy all the bytes read from this `AsyncBufRead` into the + /// `writer` specified. This future will only complete once the `reader` has hit + /// EOF and all bytes have been written to and flushed from the `writer` + /// provided. + /// + /// On success the number of bytes is returned. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncBufReadExt, AsyncWriteExt}; + /// use std::io::Cursor; + /// + /// let reader = Cursor::new([1, 2, 3, 4]); + /// let mut writer = Cursor::new([0u8; 5]); + /// + /// let bytes = reader.copy_buf_into(&mut writer).await?; + /// writer.close().await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn copy_buf_into(self, writer: &mut W) -> CopyBufInto<'_, Self, W> + where + Self: Sized, + W: AsyncWrite + Unpin + ?Sized, + { + CopyBufInto::new(self, writer) + } + /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).