Skip to content

Add AsyncRead::poll_read_buf and AsyncWrite::poll_write_buf #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,23 @@ matrix:
--no-default-features
--features alloc

- name: cargo check (futures-util)
- name: cargo check (sub crates)
rust: nightly
script:
- cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml

# futures-io
# Check default-features, all-features
- cargo check --manifest-path futures-io/Cargo.toml
- cargo check --manifest-path futures-io/Cargo.toml --all-features
# Check each features
- cargo check --manifest-path futures-io/Cargo.toml --features bytes,unstable

# futures-util
# Check default-features, all-features
- cargo check --manifest-path futures-util/Cargo.toml
- cargo check --manifest-path futures-util/Cargo.toml --all-features

# Check each features
- cargo check --manifest-path futures-util/Cargo.toml --features sink
- cargo check --manifest-path futures-util/Cargo.toml --features io
- cargo check --manifest-path futures-util/Cargo.toml --features channel
Expand All @@ -150,6 +159,7 @@ matrix:
- cargo check --manifest-path futures-util/Cargo.toml --features sink,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features io,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features sink,io
- cargo check --manifest-path futures-util/Cargo.toml --features bytes,unstable

- cargo check --manifest-path futures-util/Cargo.toml --no-default-features
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink
Expand Down
6 changes: 6 additions & 0 deletions futures-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ name = "futures_io"
default = ["std"]
std = []

# Unstable features
# `bytes` feature is outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = []

[dependencies]
bytes = { version = "0.4.7", optional = true }

[dev-dependencies]
futures-preview = { path = "../futures", version = "=0.3.0-alpha.18" }
68 changes: 68 additions & 0 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")]

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "std")]
mod if_std {
use std::cmp;
Expand All @@ -40,6 +43,10 @@ mod if_std {
SeekFrom as SeekFrom,
};

#[cfg(feature = "bytes")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use bytes::{Buf, BufMut};

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods, modeled after `std`.
#[derive(Debug)]
Expand Down Expand Up @@ -154,6 +161,42 @@ mod if_std {
Poll::Ready(Ok(0))
}
}

/// Pull some bytes from this source into the specified `BufMut`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
#[cfg(feature = "bytes")]
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

unsafe {
let n = {
let mut b = buf.bytes_mut();

self.initializer().initialize(&mut b);

match self.poll_read(cx, b)? {
Poll::Ready(n) => n,
Poll::Pending => return Poll::Pending,
}
};

buf.advance_mut(n);
Poll::Ready(Ok(n))
}
}
}

/// Write bytes asynchronously.
Expand Down Expand Up @@ -250,6 +293,31 @@ mod if_std {
/// `Poll::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;

/// Write a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
#[cfg(feature = "bytes")]
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}

let n = match self.poll_write(cx, buf.bytes())? {
Poll::Ready(n) => n,
Poll::Pending => return Poll::Pending,
};
buf.advance(n);
Poll::Ready(Ok(n))
}
}

/// Seek bytes asynchronously.
Expand Down
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack"
unstable = ["futures-core-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"]
bilock = []
bytes = ["io", "futures-io-preview/bytes", "futures-io-preview/unstable"]

[dependencies]
futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false }
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
IoSlice, IoSliceMut, Result, SeekFrom,
};
#[cfg(feature = "bytes")]
pub use futures_io::{Buf, BufMut};

#[cfg(feature = "io-compat")] use crate::compat::Compat;

Expand Down Expand Up @@ -61,6 +63,11 @@ pub use self::read::Read;
mod read_vectored;
pub use self::read_vectored::ReadVectored;

#[cfg(feature = "bytes")]
mod read_buf;
#[cfg(feature = "bytes")]
pub use self::read_buf::ReadBuf;

mod read_exact;
pub use self::read_exact::ReadExact;

Expand Down Expand Up @@ -100,6 +107,11 @@ pub use self::write::Write;
mod write_vectored;
pub use self::write_vectored::WriteVectored;

#[cfg(feature = "bytes")]
mod write_buf;
#[cfg(feature = "bytes")]
pub use self::write_buf::WriteBuf;

mod write_all;
pub use self::write_all::WriteAll;

Expand Down Expand Up @@ -213,6 +225,17 @@ pub trait AsyncReadExt: AsyncRead {
ReadVectored::new(self, bufs)
}

/// Creates a future which will read from the `AsyncRead` into `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
#[cfg(feature = "bytes")]
fn read_buf<'a, B: BufMut>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where Self: Unpin,
{
ReadBuf::new(self, buf)
}

/// Creates a future which will read exactly enough bytes to fill `buf`,
/// returning an error if end of file (EOF) is hit sooner.
///
Expand Down Expand Up @@ -455,6 +478,17 @@ pub trait AsyncWriteExt: AsyncWrite {
WriteVectored::new(self, bufs)
}

/// Creates a future which will write bytes from `buf` into the object.
///
/// The returned future will resolve to the number of bytes written once the write
/// operation is completed.
#[cfg(feature = "bytes")]
fn write_buf<'a, B: Buf>(&'a mut self, buf: &'a mut B) -> WriteBuf<'a, Self, B>
where Self: Unpin,
{
WriteBuf::new(self, buf)
}

/// Write data into this object.
///
/// Creates a future that will write the entire contents of the buffer `buf` into
Expand Down
30 changes: 30 additions & 0 deletions futures-util/src/io/read_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, BufMut};
use std::io;
use std::pin::Pin;

/// Future for the [`read_buf`](super::AsyncReadExt::read_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadBuf<'a, R: ?Sized + Unpin, B> {
reader: &'a mut R,
buf: &'a mut B,
}

impl<R: ?Sized + Unpin, B> Unpin for ReadBuf<'_, R, B> {}

impl<'a, R: AsyncRead + ?Sized + Unpin, B: BufMut> ReadBuf<'a, R, B> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut B) -> Self {
Self { reader, buf }
}
}

impl<R: AsyncRead + ?Sized + Unpin, B: BufMut> Future for ReadBuf<'_, R, B> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Pin::new(&mut this.reader).poll_read_buf(cx, this.buf)
}
}
30 changes: 30 additions & 0 deletions futures-util/src/io/write_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncWrite, Buf};
use std::io;
use std::pin::Pin;

/// Future for the [`write_buf`](super::AsyncWriteExt::write_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteBuf<'a, W: ?Sized + Unpin, B> {
writer: &'a mut W,
buf: &'a mut B,
}

impl<W: ?Sized + Unpin, B> Unpin for WriteBuf<'_, W, B> {}

impl<'a, W: AsyncWrite + ?Sized + Unpin, B: Buf> WriteBuf<'a, W, B> {
pub(super) fn new(writer: &'a mut W, buf: &'a mut B) -> Self {
Self { writer, buf }
}
}

impl<W: AsyncWrite + ?Sized + Unpin, B: Buf> Future for WriteBuf<'_, W, B> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Pin::new(&mut this.writer).poll_write_buf(cx, this.buf)
}
}
3 changes: 3 additions & 0 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "alloc")]
extern crate alloc;

Expand Down
4 changes: 3 additions & 1 deletion futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pin-utils = "0.1.0-alpha.4"
futures-test-preview = { path = "../futures-test", version = "=0.3.0-alpha.18" }
tokio = "0.1.11"
assert_matches = "1.3.0"
bytes_crate = { version = "0.4.7", package = "bytes" }

[features]
default = ["std"]
Expand All @@ -46,9 +47,10 @@ io-compat = ["compat", "futures-util-preview/io-compat"]
# Unstable features
# These features are outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-util-preview/unstable"]
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-io-preview/unstable", "futures-util-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic", "futures-channel-preview/cfg-target-has-atomic", "futures-util-preview/cfg-target-has-atomic"]
bilock = ["futures-util-preview/bilock"]
bytes = ["futures-io-preview/bytes", "futures-util-preview/bytes"]

[package.metadata.docs.rs]
all-features = true
9 changes: 9 additions & 0 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[doc(hidden)] pub use futures_core::core_reexport;

#[doc(hidden)] pub use futures_core::future::Future;
Expand Down Expand Up @@ -308,6 +311,12 @@ pub mod io {
ReadToString, ReadUntil, ReadVectored, repeat, Repeat, Seek, sink, Sink,
Take, Window, Write, WriteAll, WriteHalf, WriteVectored,
};

#[cfg(feature = "bytes")]
pub use futures_io::{Buf, BufMut};

#[cfg(feature = "bytes")]
pub use futures_util::io::{ReadBuf, WriteBuf};
}

#[cfg_attr(
Expand Down
Loading