Skip to content

Commit 85ad34a

Browse files
committed
Add AsyncRead::poll_read_buf and AsyncWrite::poll_write_buf
1 parent e9cd539 commit 85ad34a

File tree

11 files changed

+298
-3
lines changed

11 files changed

+298
-3
lines changed

.travis.yml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,23 @@ matrix:
128128
--no-default-features
129129
--features alloc
130130

131-
- name: cargo check (futures-util)
131+
- name: cargo check (sub crates)
132132
rust: nightly
133133
script:
134134
- cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml
135135

136+
# futures-io
137+
# Check default-features, all-features
138+
- cargo check --manifest-path futures-io/Cargo.toml
139+
- cargo check --manifest-path futures-io/Cargo.toml --all-features
140+
# Check each features
141+
- cargo check --manifest-path futures-io/Cargo.toml --features bytes,unstable
142+
143+
# futures-util
144+
# Check default-features, all-features
136145
- cargo check --manifest-path futures-util/Cargo.toml
137146
- cargo check --manifest-path futures-util/Cargo.toml --all-features
138-
147+
# Check each features
139148
- cargo check --manifest-path futures-util/Cargo.toml --features sink
140149
- cargo check --manifest-path futures-util/Cargo.toml --features io
141150
- cargo check --manifest-path futures-util/Cargo.toml --features channel
@@ -150,6 +159,7 @@ matrix:
150159
- cargo check --manifest-path futures-util/Cargo.toml --features sink,bilock,unstable
151160
- cargo check --manifest-path futures-util/Cargo.toml --features io,bilock,unstable
152161
- cargo check --manifest-path futures-util/Cargo.toml --features sink,io
162+
- cargo check --manifest-path futures-util/Cargo.toml --features bytes,unstable
153163

154164
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features
155165
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink

futures-io/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ name = "futures_io"
1818
default = ["std"]
1919
std = []
2020

21+
# Unstable features
22+
# `bytes` feature is outside of the normal semver guarantees and require the
23+
# `unstable` feature as an explicit opt-in to unstable API.
24+
unstable = []
25+
2126
[dependencies]
27+
bytes = { version = "0.4.7", optional = true }
2228

2329
[dev-dependencies]
2430
futures-preview = { path = "../futures", version = "=0.3.0-alpha.18" }

futures-io/src/lib.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")]
2121

22+
#[cfg(all(feature = "bytes", not(feature = "unstable")))]
23+
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");
24+
2225
#[cfg(feature = "std")]
2326
mod if_std {
2427
use std::cmp;
@@ -40,6 +43,10 @@ mod if_std {
4043
SeekFrom as SeekFrom,
4144
};
4245

46+
#[cfg(feature = "bytes")]
47+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
48+
pub use bytes::{Buf, BufMut};
49+
4350
/// A type used to conditionally initialize buffers passed to `AsyncRead`
4451
/// methods, modeled after `std`.
4552
#[derive(Debug)]
@@ -154,6 +161,42 @@ mod if_std {
154161
Poll::Ready(Ok(0))
155162
}
156163
}
164+
165+
/// Pull some bytes from this source into the specified `BufMut`, returning
166+
/// how many bytes were read.
167+
///
168+
/// The `buf` provided will have bytes read into it and the internal cursor
169+
/// will be advanced if any bytes were read. Note that this method typically
170+
/// will not reallocate the buffer provided.
171+
#[cfg(feature = "bytes")]
172+
fn poll_read_buf<B: BufMut>(
173+
self: Pin<&mut Self>,
174+
cx: &mut Context<'_>,
175+
buf: &mut B,
176+
) -> Poll<Result<usize>>
177+
where
178+
Self: Sized,
179+
{
180+
if !buf.has_remaining_mut() {
181+
return Poll::Ready(Ok(0));
182+
}
183+
184+
unsafe {
185+
let n = {
186+
let mut b = buf.bytes_mut();
187+
188+
self.initializer().initialize(&mut b);
189+
190+
match self.poll_read(cx, b)? {
191+
Poll::Ready(n) => n,
192+
Poll::Pending => return Poll::Pending,
193+
}
194+
};
195+
196+
buf.advance_mut(n);
197+
Poll::Ready(Ok(n))
198+
}
199+
}
157200
}
158201

159202
/// Write bytes asynchronously.
@@ -250,6 +293,31 @@ mod if_std {
250293
/// `Poll::Pending` and either internally retry or convert
251294
/// `Interrupted` into another error kind.
252295
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
296+
297+
/// Write a `Buf` into this value, returning how many bytes were written.
298+
///
299+
/// Note that this method will advance the `buf` provided automatically by
300+
/// the number of bytes written.
301+
#[cfg(feature = "bytes")]
302+
fn poll_write_buf<B: Buf>(
303+
self: Pin<&mut Self>,
304+
cx: &mut Context<'_>,
305+
buf: &mut B,
306+
) -> Poll<Result<usize>>
307+
where
308+
Self: Sized,
309+
{
310+
if !buf.has_remaining() {
311+
return Poll::Ready(Ok(0));
312+
}
313+
314+
let n = match self.poll_write(cx, buf.bytes())? {
315+
Poll::Ready(n) => n,
316+
Poll::Pending => return Poll::Pending,
317+
};
318+
buf.advance(n);
319+
Poll::Ready(Ok(n))
320+
}
253321
}
254322

255323
/// Seek bytes asynchronously.

futures-util/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack"
3333
unstable = ["futures-core-preview/unstable"]
3434
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"]
3535
bilock = []
36+
bytes = ["io", "futures-io-preview/bytes", "futures-io-preview/unstable"]
3637

3738
[dependencies]
3839
futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false }

futures-util/src/io/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub use futures_io::{
1313
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
1414
IoSlice, IoSliceMut, Result, SeekFrom,
1515
};
16+
#[cfg(feature = "bytes")]
17+
pub use futures_io::{Buf, BufMut};
1618

1719
#[cfg(feature = "io-compat")] use crate::compat::Compat;
1820

@@ -61,6 +63,11 @@ pub use self::read::Read;
6163
mod read_vectored;
6264
pub use self::read_vectored::ReadVectored;
6365

66+
#[cfg(feature = "bytes")]
67+
mod read_buf;
68+
#[cfg(feature = "bytes")]
69+
pub use self::read_buf::ReadBuf;
70+
6471
mod read_exact;
6572
pub use self::read_exact::ReadExact;
6673

@@ -100,6 +107,11 @@ pub use self::write::Write;
100107
mod write_vectored;
101108
pub use self::write_vectored::WriteVectored;
102109

110+
#[cfg(feature = "bytes")]
111+
mod write_buf;
112+
#[cfg(feature = "bytes")]
113+
pub use self::write_buf::WriteBuf;
114+
103115
mod write_all;
104116
pub use self::write_all::WriteAll;
105117

@@ -213,6 +225,17 @@ pub trait AsyncReadExt: AsyncRead {
213225
ReadVectored::new(self, bufs)
214226
}
215227

228+
/// Creates a future which will read from the `AsyncRead` into `buf`.
229+
///
230+
/// The returned future will resolve to the number of bytes read once the read
231+
/// operation is completed.
232+
#[cfg(feature = "bytes")]
233+
fn read_buf<'a, B: BufMut>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
234+
where Self: Unpin,
235+
{
236+
ReadBuf::new(self, buf)
237+
}
238+
216239
/// Creates a future which will read exactly enough bytes to fill `buf`,
217240
/// returning an error if end of file (EOF) is hit sooner.
218241
///
@@ -455,6 +478,17 @@ pub trait AsyncWriteExt: AsyncWrite {
455478
WriteVectored::new(self, bufs)
456479
}
457480

481+
/// Creates a future which will write bytes from `buf` into the object.
482+
///
483+
/// The returned future will resolve to the number of bytes written once the write
484+
/// operation is completed.
485+
#[cfg(feature = "bytes")]
486+
fn write_buf<'a, B: Buf>(&'a mut self, buf: &'a mut B) -> WriteBuf<'a, Self, B>
487+
where Self: Unpin,
488+
{
489+
WriteBuf::new(self, buf)
490+
}
491+
458492
/// Write data into this object.
459493
///
460494
/// Creates a future that will write the entire contents of the buffer `buf` into

futures-util/src/io/read_buf.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::{AsyncRead, BufMut};
4+
use std::io;
5+
use std::pin::Pin;
6+
7+
/// Future for the [`read_buf`](super::AsyncReadExt::read_buf) method.
8+
#[derive(Debug)]
9+
#[must_use = "futures do nothing unless you `.await` or poll them"]
10+
pub struct ReadBuf<'a, R: ?Sized + Unpin, B> {
11+
reader: &'a mut R,
12+
buf: &'a mut B,
13+
}
14+
15+
impl<R: ?Sized + Unpin, B> Unpin for ReadBuf<'_, R, B> {}
16+
17+
impl<'a, R: AsyncRead + ?Sized + Unpin, B: BufMut> ReadBuf<'a, R, B> {
18+
pub(super) fn new(reader: &'a mut R, buf: &'a mut B) -> Self {
19+
Self { reader, buf }
20+
}
21+
}
22+
23+
impl<R: AsyncRead + ?Sized + Unpin, B: BufMut> Future for ReadBuf<'_, R, B> {
24+
type Output = io::Result<usize>;
25+
26+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27+
let this = &mut *self;
28+
Pin::new(&mut this.reader).poll_read_buf(cx, this.buf)
29+
}
30+
}

futures-util/src/io/write_buf.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::{AsyncWrite, Buf};
4+
use std::io;
5+
use std::pin::Pin;
6+
7+
/// Future for the [`write_buf`](super::AsyncWriteExt::write_buf) method.
8+
#[derive(Debug)]
9+
#[must_use = "futures do nothing unless you `.await` or poll them"]
10+
pub struct WriteBuf<'a, W: ?Sized + Unpin, B> {
11+
writer: &'a mut W,
12+
buf: &'a mut B,
13+
}
14+
15+
impl<W: ?Sized + Unpin, B> Unpin for WriteBuf<'_, W, B> {}
16+
17+
impl<'a, W: AsyncWrite + ?Sized + Unpin, B: Buf> WriteBuf<'a, W, B> {
18+
pub(super) fn new(writer: &'a mut W, buf: &'a mut B) -> Self {
19+
Self { writer, buf }
20+
}
21+
}
22+
23+
impl<W: AsyncWrite + ?Sized + Unpin, B: Buf> Future for WriteBuf<'_, W, B> {
24+
type Output = io::Result<usize>;
25+
26+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27+
let this = &mut *self;
28+
Pin::new(&mut this.writer).poll_write_buf(cx, this.buf)
29+
}
30+
}

futures-util/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
1919
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
2020
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
2121

22+
#[cfg(all(feature = "bytes", not(feature = "unstable")))]
23+
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");
24+
2225
#[cfg(feature = "alloc")]
2326
extern crate alloc;
2427

futures/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pin-utils = "0.1.0-alpha.4"
3434
futures-test-preview = { path = "../futures-test", version = "=0.3.0-alpha.18" }
3535
tokio = "0.1.11"
3636
assert_matches = "1.3.0"
37+
bytes_crate = { version = "0.4.7", package = "bytes" }
3738

3839
[features]
3940
default = ["std"]
@@ -46,9 +47,10 @@ io-compat = ["compat", "futures-util-preview/io-compat"]
4647
# Unstable features
4748
# These features are outside of the normal semver guarantees and require the
4849
# `unstable` feature as an explicit opt-in to unstable API.
49-
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-util-preview/unstable"]
50+
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-io-preview/unstable", "futures-util-preview/unstable"]
5051
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic", "futures-channel-preview/cfg-target-has-atomic", "futures-util-preview/cfg-target-has-atomic"]
5152
bilock = ["futures-util-preview/bilock"]
53+
bytes = ["futures-io-preview/bytes", "futures-util-preview/bytes"]
5254

5355
[package.metadata.docs.rs]
5456
all-features = true

futures/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
4040
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
4141
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
4242

43+
#[cfg(all(feature = "bytes", not(feature = "unstable")))]
44+
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");
45+
4346
#[doc(hidden)] pub use futures_core::core_reexport;
4447

4548
#[doc(hidden)] pub use futures_core::future::Future;
@@ -308,6 +311,12 @@ pub mod io {
308311
ReadToString, ReadUntil, ReadVectored, repeat, Repeat, Seek, sink, Sink,
309312
Take, Window, Write, WriteAll, WriteHalf, WriteVectored,
310313
};
314+
315+
#[cfg(feature = "bytes")]
316+
pub use futures_io::{Buf, BufMut};
317+
318+
#[cfg(feature = "bytes")]
319+
pub use futures_util::io::{ReadBuf, WriteBuf};
311320
}
312321

313322
#[cfg_attr(

0 commit comments

Comments
 (0)