Skip to content

Commit 2fd8d53

Browse files
committed
refactor(file_store): Use BufReader but simplify
1 parent c871764 commit 2fd8d53

File tree

1 file changed

+28
-88
lines changed

1 file changed

+28
-88
lines changed

crates/file_store/src/entry_iter.rs

Lines changed: 28 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@ use std::{
77

88
use crate::bincode_options;
99

10-
type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
11-
1210
/// Iterator over entries in a file store.
1311
///
1412
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
1513
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
1614
///
1715
/// [`next`]: Self::next
1816
pub struct EntryIter<'t, T> {
19-
db_file: Option<EntryReader<'t>>,
20-
17+
/// Buffered reader around the file
18+
db_file: BufReader<&'t mut File>,
19+
finished: bool,
2120
/// The file position for the first read of `db_file`.
2221
start_pos: Option<u64>,
2322
types: PhantomData<T>,
@@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> {
2625
impl<'t, T> EntryIter<'t, T> {
2726
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
2827
Self {
29-
db_file: Some(CountingReader::new(BufReader::new(db_file))),
28+
db_file: BufReader::new(db_file),
3029
start_pos: Some(start_pos),
30+
finished: false,
3131
types: PhantomData,
3232
}
3333
}
@@ -40,45 +40,33 @@ where
4040
type Item = Result<T, IterError>;
4141

4242
fn next(&mut self) -> Option<Self::Item> {
43-
// closure which reads a single entry starting from `self.pos`
44-
let read_one =
45-
|f: &mut EntryReader, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
46-
if let Some(pos) = start_pos {
47-
f.seek(io::SeekFrom::Start(pos))?;
43+
if self.finished {
44+
return None;
45+
}
46+
(|| {
47+
if let Some(start) = self.start_pos.take() {
48+
self.db_file.seek(io::SeekFrom::Start(start))?;
49+
}
50+
51+
let pos_before_read = self.db_file.stream_position()?;
52+
match bincode_options().deserialize_from(&mut self.db_file) {
53+
Ok(changeset) => {
54+
Ok(Some(changeset))
4855
}
49-
match bincode_options().deserialize_from(&mut *f) {
50-
Ok(changeset) => {
51-
f.clear_count();
52-
Ok(Some(changeset))
53-
}
54-
Err(e) => {
55-
// allow unexpected EOF if 0 bytes were read
56-
if let bincode::ErrorKind::Io(inner) = &*e {
57-
if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 {
58-
f.clear_count();
59-
return Ok(None);
60-
}
56+
Err(e) => {
57+
self.finished = true;
58+
let pos_after_read = self.db_file.stream_position()?;
59+
// allow unexpected EOF if 0 bytes were read
60+
if let bincode::ErrorKind::Io(inner) = &*e {
61+
if inner.kind() == io::ErrorKind::UnexpectedEof && pos_after_read == pos_before_read {
62+
return Ok(None);
6163
}
62-
f.rewind()?;
63-
Err(IterError::Bincode(*e))
6464
}
65+
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
66+
Err(IterError::Bincode(*e))
6567
}
66-
};
67-
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
68-
if result.is_err() {
69-
self.db_file = None;
70-
}
71-
result.transpose()
72-
}
73-
}
74-
75-
impl<'t, T> Drop for EntryIter<'t, T> {
76-
fn drop(&mut self) {
77-
if let Some(r) = self.db_file.as_mut() {
78-
// This syncs the underlying file's offset with the buffer's position. This way, no data
79-
// is lost with future reads.
80-
let _ = r.stream_position();
81-
}
68+
}
69+
})().transpose()
8270
}
8371
}
8472

@@ -107,51 +95,3 @@ impl From<io::Error> for IterError {
10795
}
10896

10997
impl std::error::Error for IterError {}
110-
111-
/// A wrapped [`Reader`] which counts total bytes read.
112-
struct CountingReader<R> {
113-
r: R,
114-
n: u64,
115-
}
116-
117-
impl<R> CountingReader<R> {
118-
fn new(file: R) -> Self {
119-
Self { r: file, n: 0 }
120-
}
121-
122-
/// Counted bytes read.
123-
fn count(&self) -> u64 {
124-
self.n
125-
}
126-
127-
/// Clear read count.
128-
fn clear_count(&mut self) {
129-
self.n = 0;
130-
}
131-
}
132-
133-
impl<R: io::Seek> CountingReader<R> {
134-
/// Rewind file descriptor offset to before all counted read operations. Then clear the read
135-
/// count.
136-
fn rewind(&mut self) -> io::Result<u64> {
137-
let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?;
138-
self.n = 0;
139-
Ok(read)
140-
}
141-
}
142-
143-
impl<R: io::Read> io::Read for CountingReader<R> {
144-
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
145-
let read = self.r.read(&mut *buf)?;
146-
self.n += read as u64;
147-
Ok(read)
148-
}
149-
}
150-
151-
impl<R: io::Seek> io::Seek for CountingReader<R> {
152-
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
153-
let res = self.r.seek(pos);
154-
self.n = 0;
155-
res
156-
}
157-
}

0 commit comments

Comments
 (0)