Skip to content

Commit 51bd01b

Browse files
committed
fix(file_store): recover file offset after read
Because we use wrap the file with `BufReader` with the `EntryIter`, we need to sync the `BufReader`'s position with the file's offset when we drop the `EntryIter`. Therefore we have a custom drop impl for `EntryIter`.
1 parent 66dc34e commit 51bd01b

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

crates/file_store/src/entry_iter.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ where
7171
}
7272
}
7373

74+
impl<'t, T> Drop for EntryIter<'t, T> {
75+
fn drop(&mut self) {
76+
// This syncs the underlying file's offset with the buffer's position. This way, we
77+
// maintain the correct position to start the next read/write.
78+
if let Ok(pos) = self.db_file.stream_position() {
79+
let _ = self.db_file.get_mut().seek(io::SeekFrom::Start(pos));
80+
}
81+
}
82+
}
83+
7484
/// Error type for [`EntryIter`].
7585
#[derive(Debug)]
7686
pub enum IterError {

crates/file_store/src/store.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,4 +410,50 @@ mod test {
410410
}
411411
}
412412
}
413+
414+
#[test]
415+
fn write_after_short_read() {
416+
let temp_dir = tempfile::tempdir().unwrap();
417+
418+
let changesets = (0..20)
419+
.map(|n| TestChangeSet::from([format!("{}", n)]))
420+
.collect::<Vec<_>>();
421+
let last_changeset = TestChangeSet::from(["last".into()]);
422+
423+
for read_count in 0..changesets.len() {
424+
let file_path = temp_dir.path().join(format!("{}.dat", read_count));
425+
println!("Test file: {:?}", file_path);
426+
427+
// First, we create the file with all the changesets!
428+
let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
429+
for changeset in &changesets {
430+
db.append_changeset(changeset).unwrap();
431+
}
432+
drop(db);
433+
434+
// We re-open the file and read `read_count` number of changesets.
435+
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
436+
let mut exp_aggregation = db
437+
.iter_changesets()
438+
.take(read_count)
439+
.map(|r| r.expect("must read valid changeset"))
440+
.fold(TestChangeSet::default(), |mut acc, v| {
441+
Append::append(&mut acc, v);
442+
acc
443+
});
444+
// We write after a short read.
445+
db.write_changes(&last_changeset)
446+
.expect("last write must succeed");
447+
Append::append(&mut exp_aggregation, last_changeset.clone());
448+
drop(db);
449+
450+
// We open the file again and check whether aggregate changeset is expected.
451+
let aggregation = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path)
452+
.unwrap()
453+
.aggregate_changesets()
454+
.expect("must aggregate changesets")
455+
.unwrap_or_default();
456+
assert_eq!(aggregation, exp_aggregation);
457+
}
458+
}
413459
}

0 commit comments

Comments
 (0)