Skip to content

Commit 83ee9e7

Browse files
taiki-ecramertj
authored andcommitted
Add async BufReader
1 parent 052f7b6 commit 83ee9e7

File tree

4 files changed

+652
-1
lines changed

4 files changed

+652
-1
lines changed

futures-util/src/io/buf_reader.rs

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
use futures_core::task::{Context, Poll};
2+
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom};
3+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
4+
use std::io::{self, Read};
5+
use std::pin::Pin;
6+
use std::{cmp, fmt};
7+
use super::DEFAULT_BUF_SIZE;
8+
9+
/// The `BufReader` struct adds buffering to any reader.
10+
///
11+
/// It can be excessively inefficient to work directly with a [`AsyncRead`]
12+
/// instance. A `BufReader` performs large, infrequent reads on the underlying
13+
/// [`AsyncRead`] and maintains an in-memory buffer of the results.
14+
///
15+
/// `BufReader` can improve the speed of programs that make *small* and
16+
/// *repeated* read calls to the same file or network socket. It does not
17+
/// help when reading very large amounts at once, or reading just one or a few
18+
/// times. It also provides no advantage when reading from a source that is
19+
/// already in memory, like a `Vec<u8>`.
20+
///
21+
/// When the `BufReader` is dropped, the contents of its buffer will be
22+
/// discarded. Creating multiple instances of a `BufReader` on the same
23+
/// stream can cause data loss.
24+
///
25+
/// [`AsyncRead`]: futures_io::AsyncRead
26+
///
27+
// TODO: Examples
28+
pub struct BufReader<R> {
29+
inner: R,
30+
buf: Box<[u8]>,
31+
pos: usize,
32+
cap: usize,
33+
}
34+
35+
impl<R: AsyncRead> BufReader<R> {
36+
unsafe_pinned!(inner: R);
37+
unsafe_unpinned!(pos: usize);
38+
unsafe_unpinned!(cap: usize);
39+
40+
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
41+
/// but may change in the future.
42+
pub fn new(inner: R) -> Self {
43+
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
44+
}
45+
46+
/// Creates a new `BufReader` with the specified buffer capacity.
47+
pub fn with_capacity(capacity: usize, inner: R) -> Self {
48+
unsafe {
49+
let mut buffer = Vec::with_capacity(capacity);
50+
buffer.set_len(capacity);
51+
inner.initializer().initialize(&mut buffer);
52+
Self {
53+
inner,
54+
buf: buffer.into_boxed_slice(),
55+
pos: 0,
56+
cap: 0,
57+
}
58+
}
59+
}
60+
61+
/// Gets a reference to the underlying reader.
62+
///
63+
/// It is inadvisable to directly read from the underlying reader.
64+
pub fn get_ref(&self) -> &R {
65+
&self.inner
66+
}
67+
68+
/// Gets a mutable reference to the underlying reader.
69+
///
70+
/// It is inadvisable to directly read from the underlying reader.
71+
pub fn get_mut(&mut self) -> &mut R {
72+
&mut self.inner
73+
}
74+
75+
/// Gets a pinned mutable reference to the underlying reader.
76+
///
77+
/// It is inadvisable to directly read from the underlying reader.
78+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> {
79+
self.inner()
80+
}
81+
82+
/// Consumes this `BufWriter`, returning the underlying reader.
83+
///
84+
/// Note that any leftover data in the internal buffer is lost.
85+
pub fn into_inner(self) -> R {
86+
self.inner
87+
}
88+
89+
/// Returns a reference to the internally buffered data.
90+
///
91+
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
92+
pub fn buffer(&self) -> &[u8] {
93+
&self.buf[self.pos..self.cap]
94+
}
95+
96+
/// Invalidates all data in the internal buffer.
97+
#[inline]
98+
fn discard_buffer(mut self: Pin<&mut Self>) {
99+
*self.as_mut().pos() = 0;
100+
*self.cap() = 0;
101+
}
102+
}
103+
104+
impl<R: AsyncRead + AsyncSeek> BufReader<R> {
105+
// https://github.com/rust-lang/rust/issues/31100
106+
/// Seeks relative to the current position. If the new position lies within the buffer,
107+
/// the buffer will not be flushed, allowing for more efficient seeks.
108+
/// This method does not return the location of the underlying reader, so the caller
109+
/// must track this information themselves if it is required.
110+
pub fn poll_seek_relative(
111+
mut self: Pin<&mut Self>,
112+
cx: &mut Context<'_>,
113+
offset: i64,
114+
) -> Poll<io::Result<()>> {
115+
let pos = self.pos as u64;
116+
if offset < 0 {
117+
if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
118+
*self.as_mut().pos() = new_pos as usize;
119+
return Poll::Ready(Ok(()));
120+
}
121+
} else if let Some(new_pos) = pos.checked_add(offset as u64) {
122+
if new_pos <= self.cap as u64 {
123+
*self.as_mut().pos() = new_pos as usize;
124+
return Poll::Ready(Ok(()));
125+
}
126+
}
127+
self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
128+
}
129+
}
130+
131+
impl<R: AsyncRead> AsyncRead for BufReader<R> {
132+
fn poll_read(
133+
mut self: Pin<&mut Self>,
134+
cx: &mut Context<'_>,
135+
buf: &mut [u8],
136+
) -> Poll<io::Result<usize>> {
137+
// If we don't have any buffered data and we're doing a massive read
138+
// (larger than our internal buffer), bypass our internal buffer
139+
// entirely.
140+
if self.pos == self.cap && buf.len() >= self.buf.len() {
141+
let res = ready!(self.as_mut().inner().poll_read(cx, buf));
142+
self.discard_buffer();
143+
return Poll::Ready(res);
144+
}
145+
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
146+
let nread = rem.read(buf)?;
147+
self.consume(nread);
148+
Poll::Ready(Ok(nread))
149+
}
150+
151+
fn poll_read_vectored(
152+
mut self: Pin<&mut Self>,
153+
cx: &mut Context<'_>,
154+
bufs: &mut [IoSliceMut<'_>],
155+
) -> Poll<io::Result<usize>> {
156+
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
157+
if self.pos == self.cap && total_len >= self.buf.len() {
158+
let res = ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
159+
self.discard_buffer();
160+
return Poll::Ready(res);
161+
}
162+
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
163+
let nread = rem.read_vectored(bufs)?;
164+
self.consume(nread);
165+
Poll::Ready(Ok(nread))
166+
}
167+
168+
// we can't skip unconditionally because of the large buffer case in read.
169+
unsafe fn initializer(&self) -> Initializer {
170+
self.inner.initializer()
171+
}
172+
}
173+
174+
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
175+
fn poll_fill_buf<'a>(
176+
self: Pin<&'a mut Self>,
177+
cx: &mut Context<'_>,
178+
) -> Poll<io::Result<&'a [u8]>> {
179+
let Self { inner, buf, cap, pos } = unsafe { Pin::get_unchecked_mut(self) };
180+
let mut inner = unsafe { Pin::new_unchecked(inner) };
181+
182+
// If we've reached the end of our internal buffer then we need to fetch
183+
// some more data from the underlying reader.
184+
// Branch using `>=` instead of the more correct `==`
185+
// to tell the compiler that the pos..cap slice is always valid.
186+
if *pos >= *cap {
187+
debug_assert!(*pos == *cap);
188+
*cap = ready!(inner.as_mut().poll_read(cx, buf))?;
189+
*pos = 0;
190+
}
191+
Poll::Ready(Ok(&buf[*pos..*cap]))
192+
}
193+
194+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
195+
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
196+
}
197+
}
198+
199+
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
200+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
201+
fmt.debug_struct("BufReader")
202+
.field("reader", &self.inner)
203+
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
204+
.finish()
205+
}
206+
}
207+
208+
impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
209+
/// Seek to an offset, in bytes, in the underlying reader.
210+
///
211+
/// The position used for seeking with `SeekFrom::Current(_)` is the
212+
/// position the underlying reader would be at if the `BufReader` had no
213+
/// internal buffer.
214+
///
215+
/// Seeking always discards the internal buffer, even if the seek position
216+
/// would otherwise fall within it. This guarantees that calling
217+
/// `.into_inner()` immediately after a seek yields the underlying reader
218+
/// at the same position.
219+
///
220+
/// To seek without discarding the internal buffer, use
221+
/// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
222+
///
223+
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
224+
///
225+
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
226+
/// where `n` minus the internal buffer length overflows an `i64`, two
227+
/// seeks will be performed instead of one. If the second seek returns
228+
/// `Err`, the underlying reader will be left at the same position it would
229+
/// have if you called `seek` with `SeekFrom::Current(0)`.
230+
fn poll_seek(
231+
mut self: Pin<&mut Self>,
232+
cx: &mut Context<'_>,
233+
pos: SeekFrom,
234+
) -> Poll<io::Result<u64>> {
235+
let result: u64;
236+
if let SeekFrom::Current(n) = pos {
237+
let remainder = (self.cap - self.pos) as i64;
238+
// it should be safe to assume that remainder fits within an i64 as the alternative
239+
// means we managed to allocate 8 exbibytes and that's absurd.
240+
// But it's not out of the realm of possibility for some weird underlying reader to
241+
// support seeking by i64::min_value() so we need to handle underflow when subtracting
242+
// remainder.
243+
if let Some(offset) = n.checked_sub(remainder) {
244+
result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(offset)))?;
245+
} else {
246+
// seek backwards by our remainder, and then by the offset
247+
ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(-remainder)))?;
248+
self.as_mut().discard_buffer();
249+
result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?;
250+
}
251+
} else {
252+
// Seeking with Start/End doesn't care about our buffer length.
253+
result = ready!(self.as_mut().inner().poll_seek(cx, pos))?;
254+
}
255+
self.discard_buffer();
256+
Poll::Ready(Ok(result))
257+
}
258+
}

futures-util/src/io/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,19 @@ pub use futures_io::{
1111

1212
#[cfg(feature = "io-compat")] use crate::compat::Compat;
1313

14+
// used by `BufReader` and `BufWriter`
15+
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
16+
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
17+
1418
mod allow_std;
1519
pub use self::allow_std::AllowStdIo;
1620

21+
mod buf_reader;
22+
pub use self::buf_reader::BufReader;
23+
24+
// mod buf_writer;
25+
// pub use self::buf_writer::BufWriter;
26+
1727
mod copy_into;
1828
pub use self::copy_into::CopyInto;
1929

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ pub mod io {
280280
};
281281

282282
pub use futures_util::io::{
283-
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
283+
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader,
284284
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
285285
Seek, Window, WriteAll, WriteHalf,
286286
};

0 commit comments

Comments
 (0)