Skip to content

Commit fdfbef1

Browse files
committed
Add async BufReader
1 parent 50f3f71 commit fdfbef1

File tree

4 files changed

+591
-1
lines changed

4 files changed

+591
-1
lines changed

futures-util/src/io/buf_reader.rs

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
use futures_core::task::{Context, Poll};
2+
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoVec, SeekFrom};
3+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
4+
use std::io::{self, Read};
5+
use std::pin::Pin;
6+
use std::{cmp, fmt};
7+
use super::DEFAULT_BUF_SIZE;
8+
9+
/// The `BufReader` struct adds buffering to any reader.
10+
///
11+
/// It can be excessively inefficient to work directly with a [`AsyncRead`]
12+
/// instance. A `BufReader` performs large, infrequent reads on the underlying
13+
/// [`AsyncRead`] and maintains an in-memory buffer of the results.
14+
///
15+
/// `BufReader` can improve the speed of programs that make *small* and
16+
/// *repeated* read calls to the same file or network socket. It does not
17+
/// help when reading very large amounts at once, or reading just one or a few
18+
/// times. It also provides no advantage when reading from a source that is
19+
/// already in memory, like a `Vec<u8>`.
20+
///
21+
/// When the `BufReader` is dropped, the contents of its buffer will be
22+
/// discarded. Creating multiple instances of a `BufReader` on the same
23+
/// stream can cause data loss.
24+
///
25+
/// [`AsyncRead`]: futures_io::AsyncRead
26+
///
27+
// TODO: Examples
28+
pub struct BufReader<R> {
29+
inner: R,
30+
buf: Box<[u8]>,
31+
pos: usize,
32+
cap: usize,
33+
}
34+
35+
impl<R: AsyncRead> BufReader<R> {
36+
unsafe_pinned!(inner: R);
37+
unsafe_pinned!(buf: Box<[u8]>);
38+
unsafe_unpinned!(pos: usize);
39+
unsafe_unpinned!(cap: usize);
40+
41+
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42+
/// but may change in the future.
43+
pub fn new(inner: R) -> Self {
44+
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45+
}
46+
47+
/// Creates a new `BufReader` with the specified buffer capacity.
48+
pub fn with_capacity(capacity: usize, inner: R) -> Self {
49+
unsafe {
50+
let mut buffer = Vec::with_capacity(capacity);
51+
buffer.set_len(capacity);
52+
inner.initializer().initialize(&mut buffer);
53+
Self {
54+
inner,
55+
buf: buffer.into_boxed_slice(),
56+
pos: 0,
57+
cap: 0,
58+
}
59+
}
60+
}
61+
62+
/// Gets a reference to the underlying reader.
63+
///
64+
/// It is inadvisable to directly read from the underlying reader.
65+
pub fn get_ref(&self) -> &R {
66+
&self.inner
67+
}
68+
69+
/// Gets a mutable reference to the underlying reader.
70+
///
71+
/// It is inadvisable to directly read from the underlying reader.
72+
pub fn get_mut(&mut self) -> &mut R {
73+
&mut self.inner
74+
}
75+
76+
/// Gets a pinned mutable reference to the underlying reader.
77+
///
78+
/// It is inadvisable to directly read from the underlying reader.
79+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> {
80+
self.inner()
81+
}
82+
83+
/// Consumes this `BufWriter`, returning the underlying reader.
84+
///
85+
/// Note that any leftover data in the internal buffer is lost.
86+
pub fn into_inner(self) -> R {
87+
self.inner
88+
}
89+
90+
/// Returns a reference to the internally buffered data.
91+
///
92+
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
93+
pub fn buffer(&self) -> &[u8] {
94+
&self.buf[self.pos..self.cap]
95+
}
96+
97+
/// Invalidates all data in the internal buffer.
98+
#[inline]
99+
fn discard_buffer(mut self: Pin<&mut Self>) {
100+
*self.as_mut().pos() = 0;
101+
*self.cap() = 0;
102+
}
103+
}
104+
105+
impl<R: AsyncRead + AsyncSeek> BufReader<R> {
106+
// https://github.com/rust-lang/rust/issues/31100
107+
/// Seeks relative to the current position. If the new position lies within the buffer,
108+
/// the buffer will not be flushed, allowing for more efficient seeks.
109+
/// This method does not return the location of the underlying reader, so the caller
110+
/// must track this information themselves if it is required.
111+
pub fn poll_seek_relative(
112+
mut self: Pin<&mut Self>,
113+
cx: &mut Context<'_>,
114+
offset: i64,
115+
) -> Poll<io::Result<()>> {
116+
let pos = self.pos as u64;
117+
if offset < 0 {
118+
if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
119+
*self.as_mut().pos() = new_pos as usize;
120+
return Poll::Ready(Ok(()));
121+
}
122+
} else if let Some(new_pos) = pos.checked_add(offset as u64) {
123+
if new_pos <= self.cap as u64 {
124+
*self.as_mut().pos() = new_pos as usize;
125+
return Poll::Ready(Ok(()));
126+
}
127+
}
128+
self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
129+
}
130+
}
131+
132+
impl<R: AsyncRead> AsyncRead for BufReader<R> {
133+
fn poll_read(
134+
mut self: Pin<&mut Self>,
135+
cx: &mut Context<'_>,
136+
buf: &mut [u8],
137+
) -> Poll<io::Result<usize>> {
138+
// If we don't have any buffered data and we're doing a massive read
139+
// (larger than our internal buffer), bypass our internal buffer
140+
// entirely.
141+
if self.pos == self.cap && buf.len() >= self.buf.len() {
142+
let res = ready!(self.as_mut().inner().poll_read(cx, buf));
143+
self.discard_buffer();
144+
return Poll::Ready(res);
145+
}
146+
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
147+
let nread = rem.read(buf)?;
148+
self.consume(nread);
149+
Poll::Ready(Ok(nread))
150+
}
151+
152+
fn poll_vectored_read(
153+
mut self: Pin<&mut Self>,
154+
cx: &mut Context<'_>,
155+
bufs: &mut [&mut IoVec],
156+
) -> Poll<io::Result<usize>> {
157+
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
158+
if self.pos == self.cap && total_len >= self.buf.len() {
159+
let res = ready!(self.as_mut().inner().poll_vectored_read(cx, bufs));
160+
self.discard_buffer();
161+
return Poll::Ready(res);
162+
}
163+
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
164+
// TODO: replace with `rem.read_vectored(bufs)?`
165+
let nread = ready!(Pin::new(&mut rem).poll_vectored_read(cx, bufs))?;
166+
self.consume(nread);
167+
Poll::Ready(Ok(nread))
168+
}
169+
170+
// we can't skip unconditionally because of the large buffer case in read.
171+
unsafe fn initializer(&self) -> Initializer {
172+
self.inner.initializer()
173+
}
174+
}
175+
176+
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
177+
fn poll_fill_buf<'a>(
178+
self: Pin<&'a mut Self>,
179+
cx: &mut Context<'_>,
180+
) -> Poll<io::Result<&'a [u8]>> {
181+
let Self { inner, buf, cap, pos } = unsafe { Pin::get_unchecked_mut(self) };
182+
let mut inner = unsafe { Pin::new_unchecked(inner) };
183+
184+
// If we've reached the end of our internal buffer then we need to fetch
185+
// some more data from the underlying reader.
186+
// Branch using `>=` instead of the more correct `==`
187+
// to tell the compiler that the pos..cap slice is always valid.
188+
if *pos >= *cap {
189+
debug_assert!(*pos == *cap);
190+
*cap = ready!(inner.as_mut().poll_read(cx, buf))?;
191+
*pos = 0;
192+
}
193+
Poll::Ready(Ok(&buf[*pos..*cap]))
194+
}
195+
196+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
197+
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
198+
}
199+
}
200+
201+
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
202+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
203+
fmt.debug_struct("BufReader")
204+
.field("reader", &self.inner)
205+
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
206+
.finish()
207+
}
208+
}
209+
210+
impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
211+
/// Seek to an offset, in bytes, in the underlying reader.
212+
///
213+
/// The position used for seeking with `SeekFrom::Current(_)` is the
214+
/// position the underlying reader would be at if the `BufReader` had no
215+
/// internal buffer.
216+
///
217+
/// Seeking always discards the internal buffer, even if the seek position
218+
/// would otherwise fall within it. This guarantees that calling
219+
/// `.into_inner()` immediately after a seek yields the underlying reader
220+
/// at the same position.
221+
///
222+
/// To seek without discarding the internal buffer, use
223+
/// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
224+
///
225+
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
226+
///
227+
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
228+
/// where `n` minus the internal buffer length overflows an `i64`, two
229+
/// seeks will be performed instead of one. If the second seek returns
230+
/// `Err`, the underlying reader will be left at the same position it would
231+
/// have if you called `seek` with `SeekFrom::Current(0)`.
232+
fn poll_seek(
233+
mut self: Pin<&mut Self>,
234+
cx: &mut Context<'_>,
235+
pos: SeekFrom,
236+
) -> Poll<io::Result<u64>> {
237+
let result: u64;
238+
if let SeekFrom::Current(n) = pos {
239+
let remainder = (self.cap - self.pos) as i64;
240+
// it should be safe to assume that remainder fits within an i64 as the alternative
241+
// means we managed to allocate 8 exbibytes and that's absurd.
242+
// But it's not out of the realm of possibility for some weird underlying reader to
243+
// support seeking by i64::min_value() so we need to handle underflow when subtracting
244+
// remainder.
245+
if let Some(offset) = n.checked_sub(remainder) {
246+
result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(offset)))?;
247+
} else {
248+
// seek backwards by our remainder, and then by the offset
249+
ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(-remainder)))?;
250+
result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?;
251+
}
252+
} else {
253+
// Seeking with Start/End doesn't care about our buffer length.
254+
result = ready!(self.as_mut().inner().poll_seek(cx, pos))?;
255+
}
256+
self.discard_buffer();
257+
Poll::Ready(Ok(result))
258+
}
259+
}

futures-util/src/io/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,19 @@ pub use futures_io::{
1111

1212
#[cfg(feature = "io-compat")] use crate::compat::Compat;
1313

14+
// used by `BufReader` and `BufWriter`
15+
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
16+
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
17+
1418
mod allow_std;
1519
pub use self::allow_std::AllowStdIo;
1620

21+
mod buf_reader;
22+
pub use self::buf_reader::BufReader;
23+
24+
// mod buf_writer;
25+
// pub use self::buf_writer::BufWriter;
26+
1727
mod copy_into;
1828
pub use self::copy_into::CopyInto;
1929

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ pub mod io {
275275
Initializer, IoVec, Result, SeekFrom,
276276
};
277277
pub use futures_util::io::{
278-
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
278+
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader,
279279
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
280280
Seek, Window, WriteAll, WriteHalf,
281281
};

0 commit comments

Comments
 (0)