From a3aa8b6682a3a13958fd5fbadc4074a1907a78db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sun, 7 Jan 2024 16:09:03 +0800 Subject: [PATCH 1/4] feat(file_store)!: optimize `EntryIter` by reducing syscalls * Wrap file reader with `BufReader`. This reduces calls to `read`. * Wrap file reader with `CountingReader`. This counts the bytes read by the underlying reader. We can rewind without seeking first. --- crates/file_store/src/entry_iter.rs | 109 +++++++++++++++++++++------- 1 file changed, 83 insertions(+), 26 deletions(-) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index 770f264f3..d95a67f8e 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -1,12 +1,14 @@ use bincode::Options; use std::{ fs::File, - io::{self, Seek}, + io::{self, BufReader, Seek}, marker::PhantomData, }; use crate::bincode_options; +type EntryReader<'t> = CountingReader>; + /// Iterator over entries in a file store. /// /// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the @@ -14,7 +16,7 @@ use crate::bincode_options; /// /// [`next`]: Self::next pub struct EntryIter<'t, T> { - db_file: Option<&'t mut File>, + db_file: Option>, /// The file position for the first read of `db_file`. start_pos: Option, @@ -24,7 +26,7 @@ pub struct EntryIter<'t, T> { impl<'t, T> EntryIter<'t, T> { pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { Self { - db_file: Some(db_file), + db_file: Some(CountingReader::new(BufReader::new(db_file))), start_pos: Some(start_pos), types: PhantomData, } @@ -39,32 +41,29 @@ where fn next(&mut self) -> Option { // closure which reads a single entry starting from `self.pos` - let read_one = |f: &mut File, start_pos: Option| -> Result, IterError> { - let pos = match start_pos { - Some(pos) => f.seek(io::SeekFrom::Start(pos))?, - None => f.stream_position()?, - }; - - match bincode_options().deserialize_from(&*f) { - Ok(changeset) => { - f.stream_position()?; - Ok(Some(changeset)) + let read_one = + |f: &mut EntryReader, start_pos: Option| -> Result, IterError> { + if let Some(pos) = start_pos { + f.seek(io::SeekFrom::Start(pos))?; } - Err(e) => { - if let bincode::ErrorKind::Io(inner) = &*e { - if inner.kind() == io::ErrorKind::UnexpectedEof { - let eof = f.seek(io::SeekFrom::End(0))?; - if pos == eof { + match bincode_options().deserialize_from(&mut *f) { + Ok(changeset) => { + f.clear_count(); + Ok(Some(changeset)) + } + Err(e) => { + // allow unexpected EOF if 0 bytes were read + if let bincode::ErrorKind::Io(inner) = &*e { + if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 { + f.clear_count(); return Ok(None); } } + f.rewind()?; + Err(IterError::Bincode(*e)) } - f.seek(io::SeekFrom::Start(pos))?; - Err(IterError::Bincode(*e)) } - } - }; - + }; let result = read_one(self.db_file.as_mut()?, self.start_pos.take()); if result.is_err() { self.db_file = None; @@ -73,9 +72,13 @@ where } } -impl From for IterError { - fn from(value: io::Error) -> Self { - IterError::Io(value) +impl<'t, T> Drop for EntryIter<'t, T> { + fn drop(&mut self) { + if let Some(r) = self.db_file.as_mut() { + // This syncs the underlying file's offset with the buffer's position. This way, no data + // is lost with future reads. + let _ = r.stream_position(); + } } } @@ -97,4 +100,58 @@ impl core::fmt::Display for IterError { } } +impl From for IterError { + fn from(value: io::Error) -> Self { + IterError::Io(value) + } +} + impl std::error::Error for IterError {} + +/// A wrapped [`Reader`] which counts total bytes read. +struct CountingReader { + r: R, + n: u64, +} + +impl CountingReader { + fn new(file: R) -> Self { + Self { r: file, n: 0 } + } + + /// Counted bytes read. + fn count(&self) -> u64 { + self.n + } + + /// Clear read count. + fn clear_count(&mut self) { + self.n = 0; + } +} + +impl CountingReader { + /// Rewind file descriptor offset to before all counted read operations. Then clear the read + /// count. + fn rewind(&mut self) -> io::Result { + let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?; + self.n = 0; + Ok(read) + } +} + +impl io::Read for CountingReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let read = self.r.read(&mut *buf)?; + self.n += read as u64; + Ok(read) + } +} + +impl io::Seek for CountingReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let res = self.r.seek(pos); + self.n = 0; + res + } +} From c8717646700bdac0f1e13ec499481bd1fee30ffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 13 Jan 2024 17:47:13 +0800 Subject: [PATCH 2/4] test(file_store): `last_write_is_short` This test simulates a situation where the last write to the db is short. Aggregating the changeset after reopening the file should return an error (which includes a partially-aggregated changeset) containing an aggregation of changesets that were fully written. At this point, the test re-writes the final changeset (and this time it successfully writes in full). The file should be recoverable with all changesets, including the last one. --- crates/file_store/src/store.rs | 76 ++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index ebab2fd00..83e5272fd 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -219,6 +219,7 @@ mod test { use bincode::DefaultOptions; use std::{ + collections::BTreeSet, io::{Read, Write}, vec::Vec, }; @@ -228,7 +229,7 @@ mod test { const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; - type TestChangeSet = Vec; + type TestChangeSet = BTreeSet; #[derive(Debug)] struct TestTracker; @@ -253,7 +254,7 @@ mod test { fn open_or_create_new() { let temp_dir = tempfile::tempdir().unwrap(); let file_path = temp_dir.path().join("db_file"); - let changeset = vec!["hello".to_string(), "world".to_string()]; + let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); { let mut db = Store::::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) @@ -304,7 +305,7 @@ mod test { let mut data = [255_u8; 2000]; data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); - let changeset = vec!["one".into(), "two".into(), "three!".into()]; + let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); let mut file = NamedTempFile::new().unwrap(); file.write_all(&data).expect("should write"); @@ -340,4 +341,73 @@ mod test { assert_eq!(got_bytes, expected_bytes); } + + #[test] + fn last_write_is_short() { + let temp_dir = tempfile::tempdir().unwrap(); + + let changesets = [ + TestChangeSet::from(["1".into()]), + TestChangeSet::from(["2".into(), "3".into()]), + TestChangeSet::from(["4".into(), "5".into(), "6".into()]), + ]; + let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); + let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); + + for short_write_len in 1..last_changeset_bytes.len() - 1 { + let file_path = temp_dir.path().join(format!("{}.dat", short_write_len)); + println!("Test file: {:?}", file_path); + + // simulate creating a file, writing data where the last write is incomplete + { + let mut db = + Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + // this is the incomplete write + db.db_file + .write_all(&last_changeset_bytes[..short_write_len]) + .unwrap(); + } + + // load file again and aggregate changesets + // write the last changeset again (this time it succeeds) + { + let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let err = db + .aggregate_changesets() + .expect_err("should return error as last read is short"); + assert_eq!( + err.changeset, + changesets.iter().cloned().reduce(|mut acc, cs| { + Append::append(&mut acc, cs); + acc + }), + "should recover all changesets that are written in full", + ); + db.db_file.write_all(&last_changeset_bytes).unwrap(); + } + + // load file again - this time we should successfully aggregate all changesets + { + let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let aggregated_changesets = db + .aggregate_changesets() + .expect("aggregating all changesets should succeed"); + assert_eq!( + aggregated_changesets, + changesets + .iter() + .cloned() + .chain(core::iter::once(last_changeset.clone())) + .reduce(|mut acc, cs| { + Append::append(&mut acc, cs); + acc + }), + "should recover all changesets", + ); + } + } + } } From 66dc34e75ab1ec2ae533bd540327721f6226eca1 Mon Sep 17 00:00:00 2001 From: LLFourn Date: Mon, 22 Jan 2024 13:48:48 +1100 Subject: [PATCH 3/4] refactor(file_store): Use BufReader but simplify --- crates/file_store/src/entry_iter.rs | 119 +++++++--------------------- 1 file changed, 30 insertions(+), 89 deletions(-) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index d95a67f8e..e5e70b3b1 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -7,8 +7,6 @@ use std::{ use crate::bincode_options; -type EntryReader<'t> = CountingReader>; - /// Iterator over entries in a file store. /// /// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the @@ -16,8 +14,9 @@ type EntryReader<'t> = CountingReader>; /// /// [`next`]: Self::next pub struct EntryIter<'t, T> { - db_file: Option>, - + /// Buffered reader around the file + db_file: BufReader<&'t mut File>, + finished: bool, /// The file position for the first read of `db_file`. start_pos: Option, types: PhantomData, @@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> { impl<'t, T> EntryIter<'t, T> { pub fn new(start_pos: u64, db_file: &'t mut File) -> Self { Self { - db_file: Some(CountingReader::new(BufReader::new(db_file))), + db_file: BufReader::new(db_file), start_pos: Some(start_pos), + finished: false, types: PhantomData, } } @@ -40,45 +40,34 @@ where type Item = Result; fn next(&mut self) -> Option { - // closure which reads a single entry starting from `self.pos` - let read_one = - |f: &mut EntryReader, start_pos: Option| -> Result, IterError> { - if let Some(pos) = start_pos { - f.seek(io::SeekFrom::Start(pos))?; - } - match bincode_options().deserialize_from(&mut *f) { - Ok(changeset) => { - f.clear_count(); - Ok(Some(changeset)) - } - Err(e) => { - // allow unexpected EOF if 0 bytes were read - if let bincode::ErrorKind::Io(inner) = &*e { - if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 { - f.clear_count(); - return Ok(None); - } + if self.finished { + return None; + } + (|| { + if let Some(start) = self.start_pos.take() { + self.db_file.seek(io::SeekFrom::Start(start))?; + } + + let pos_before_read = self.db_file.stream_position()?; + match bincode_options().deserialize_from(&mut self.db_file) { + Ok(changeset) => Ok(Some(changeset)), + Err(e) => { + self.finished = true; + let pos_after_read = self.db_file.stream_position()?; + // allow unexpected EOF if 0 bytes were read + if let bincode::ErrorKind::Io(inner) = &*e { + if inner.kind() == io::ErrorKind::UnexpectedEof + && pos_after_read == pos_before_read + { + return Ok(None); } - f.rewind()?; - Err(IterError::Bincode(*e)) } + self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; + Err(IterError::Bincode(*e)) } - }; - let result = read_one(self.db_file.as_mut()?, self.start_pos.take()); - if result.is_err() { - self.db_file = None; - } - result.transpose() - } -} - -impl<'t, T> Drop for EntryIter<'t, T> { - fn drop(&mut self) { - if let Some(r) = self.db_file.as_mut() { - // This syncs the underlying file's offset with the buffer's position. This way, no data - // is lost with future reads. - let _ = r.stream_position(); - } + } + })() + .transpose() } } @@ -107,51 +96,3 @@ impl From for IterError { } impl std::error::Error for IterError {} - -/// A wrapped [`Reader`] which counts total bytes read. -struct CountingReader { - r: R, - n: u64, -} - -impl CountingReader { - fn new(file: R) -> Self { - Self { r: file, n: 0 } - } - - /// Counted bytes read. - fn count(&self) -> u64 { - self.n - } - - /// Clear read count. - fn clear_count(&mut self) { - self.n = 0; - } -} - -impl CountingReader { - /// Rewind file descriptor offset to before all counted read operations. Then clear the read - /// count. - fn rewind(&mut self) -> io::Result { - let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?; - self.n = 0; - Ok(read) - } -} - -impl io::Read for CountingReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let read = self.r.read(&mut *buf)?; - self.n += read as u64; - Ok(read) - } -} - -impl io::Seek for CountingReader { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let res = self.r.seek(pos); - self.n = 0; - res - } -} From 51bd01b3dd3d68c1628d74be9dc6d0f1cf1f15fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Tue, 23 Jan 2024 12:32:13 +0800 Subject: [PATCH 4/4] fix(file_store): recover file offset after read Because we use wrap the file with `BufReader` with the `EntryIter`, we need to sync the `BufReader`'s position with the file's offset when we drop the `EntryIter`. Therefore we have a custom drop impl for `EntryIter`. --- crates/file_store/src/entry_iter.rs | 10 +++++++ crates/file_store/src/store.rs | 46 +++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index e5e70b3b1..6be3fd034 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -71,6 +71,16 @@ where } } +impl<'t, T> Drop for EntryIter<'t, T> { + fn drop(&mut self) { + // This syncs the underlying file's offset with the buffer's position. This way, we + // maintain the correct position to start the next read/write. + if let Ok(pos) = self.db_file.stream_position() { + let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos)); + } + } +} + /// Error type for [`EntryIter`]. #[derive(Debug)] pub enum IterError { diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 83e5272fd..0dc45d28c 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -410,4 +410,50 @@ mod test { } } } + + #[test] + fn write_after_short_read() { + let temp_dir = tempfile::tempdir().unwrap(); + + let changesets = (0..20) + .map(|n| TestChangeSet::from([format!("{}", n)])) + .collect::>(); + let last_changeset = TestChangeSet::from(["last".into()]); + + for read_count in 0..changesets.len() { + let file_path = temp_dir.path().join(format!("{}.dat", read_count)); + println!("Test file: {:?}", file_path); + + // First, we create the file with all the changesets! + let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + drop(db); + + // We re-open the file and read `read_count` number of changesets. + let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let mut exp_aggregation = db + .iter_changesets() + .take(read_count) + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSet::default(), |mut acc, v| { + Append::append(&mut acc, v); + acc + }); + // We write after a short read. + db.write_changes(&last_changeset) + .expect("last write must succeed"); + Append::append(&mut exp_aggregation, last_changeset.clone()); + drop(db); + + // We open the file again and check whether aggregate changeset is expected. + let aggregation = Store::::open(&TEST_MAGIC_BYTES, &file_path) + .unwrap() + .aggregate_changesets() + .expect("must aggregate changesets") + .unwrap_or_default(); + assert_eq!(aggregation, exp_aggregation); + } + } }