Skip to content

Commit d71b03e

Browse files
authored
Merge pull request aldanor#198 from Rikorose/feat_io_read
Add ByteReader implementing io::Read and io::Seek
2 parents 268cd29 + db4be9d commit d71b03e

File tree

6 files changed

+215
-3
lines changed

6 files changed

+215
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
- Support for HDF5 version 1.13.0.
88
- Support field renaming via `#[hdf5(rename = "new_name")]` helper attribute.
9+
- Add a `ByteReader` which implements `std::io::{Read, Seek}` for 1D `u8`
10+
datasets. Usage via `Dataset::as_byte_reader()`.
911

1012
### Changed
1113

src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::convert::Infallible;
22
use std::error::Error as StdError;
33
use std::fmt;
4+
use std::io;
45
use std::ops::Deref;
56
use std::panic;
67
use std::ptr;
@@ -268,6 +269,12 @@ impl From<ShapeError> for Error {
268269
}
269270
}
270271

272+
impl From<Error> for io::Error {
273+
fn from(err: Error) -> Self {
274+
Self::new(io::ErrorKind::Other, err)
275+
}
276+
}
277+
271278
pub fn h5check<T: H5ErrorCode>(value: T) -> Result<T> {
272279
H5ErrorCode::h5check(value)
273280
}

src/hl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use self::{
1717
Attribute, AttributeBuilder, AttributeBuilderData, AttributeBuilderEmpty,
1818
AttributeBuilderEmptyShape,
1919
},
20-
container::{Container, Reader, Writer},
20+
container::{ByteReader, Container, Reader, Writer},
2121
dataset::{
2222
Dataset, DatasetBuilder, DatasetBuilderData, DatasetBuilderEmpty, DatasetBuilderEmptyShape,
2323
},

src/hl/container.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::convert::TryInto;
22
use std::fmt::{self, Debug};
3+
use std::io;
34
use std::mem;
45
use std::ops::Deref;
56

@@ -346,6 +347,99 @@ impl<'a> Writer<'a> {
346347
}
347348
}
348349

350+
#[derive(Debug, Clone)]
351+
pub struct ByteReader {
352+
obj: Container,
353+
pos: u64,
354+
dt: Datatype,
355+
obj_space: Dataspace,
356+
xfer: PropertyList,
357+
}
358+
359+
impl ByteReader {
360+
pub fn new(obj: &Container) -> Result<Self> {
361+
ensure!(!obj.is_attr(), "ByteReader cannot be used on attribute datasets");
362+
363+
let obj = obj.clone();
364+
let file_dtype = obj.dtype()?;
365+
let mem_dtype = Datatype::from_type::<u8>()?;
366+
file_dtype.ensure_convertible(&mem_dtype, Conversion::NoOp)?;
367+
368+
let obj_space = obj.space()?;
369+
ensure!(obj_space.shape().len() == 1, "Only rank 1 datasets can be read via ByteReader");
370+
let xfer = PropertyList::from_id(h5call!(H5Pcreate(*crate::globals::H5P_DATASET_XFER))?)?;
371+
if !hdf5_types::USING_H5_ALLOCATOR {
372+
crate::hl::plist::set_vlen_manager_libc(xfer.id())?;
373+
}
374+
Ok(ByteReader { obj, pos: 0, obj_space, dt: mem_dtype, xfer })
375+
}
376+
377+
fn dataset_len(&self) -> usize {
378+
self.obj_space.shape()[0]
379+
}
380+
381+
fn remaining_len(&self) -> usize {
382+
self.dataset_len().saturating_sub(self.pos as usize)
383+
}
384+
385+
pub fn is_empty(&self) -> bool {
386+
self.pos >= self.dataset_len() as u64
387+
}
388+
}
389+
390+
impl io::Read for ByteReader {
391+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
392+
let pos = self.pos as usize;
393+
let amt = std::cmp::min(buf.len(), self.remaining_len());
394+
let selection = Selection::new(pos..pos + amt);
395+
let out_shape = selection.out_shape(&self.obj_space.shape())?;
396+
let fspace = self.obj_space.select(selection)?;
397+
let mspace = Dataspace::try_new(&out_shape)?;
398+
h5call!(H5Dread(
399+
self.obj.id(),
400+
self.dt.id(),
401+
mspace.id(),
402+
fspace.id(),
403+
self.xfer.id(),
404+
buf.as_mut_ptr().cast()
405+
))?;
406+
self.pos += amt as u64;
407+
Ok(out_shape[0])
408+
}
409+
}
410+
411+
impl io::Seek for ByteReader {
412+
fn seek(&mut self, style: io::SeekFrom) -> io::Result<u64> {
413+
let (base_pos, offset) = match style {
414+
io::SeekFrom::Start(n) => {
415+
self.pos = n;
416+
return Ok(n);
417+
}
418+
io::SeekFrom::End(n) => (self.dataset_len() as u64, n),
419+
io::SeekFrom::Current(n) => (self.pos, n),
420+
};
421+
let new_pos = if offset.is_negative() {
422+
base_pos.checked_sub(offset.wrapping_abs() as u64)
423+
} else {
424+
base_pos.checked_add(offset as u64)
425+
};
426+
match new_pos {
427+
Some(n) => {
428+
self.pos = n;
429+
Ok(self.pos)
430+
}
431+
None => Err(io::Error::new(
432+
io::ErrorKind::InvalidInput,
433+
"invalid seek to a negative or overflowing position",
434+
)),
435+
}
436+
}
437+
438+
fn stream_position(&mut self) -> io::Result<u64> {
439+
Ok(self.pos)
440+
}
441+
}
442+
349443
#[repr(transparent)]
350444
#[derive(Clone)]
351445
/// An object which can be read or written to.
@@ -395,6 +489,14 @@ impl Container {
395489
Writer::new(self)
396490
}
397491

492+
/// Creates `ByteReader` which implements [`Read`](std::io::Read)
493+
/// and [`Seek`](std::io::Seek).
494+
///
495+
/// ``ByteReader`` only supports 1-D `u8` datasets.
496+
pub fn as_byte_reader(&self) -> Result<ByteReader> {
497+
ByteReader::new(&self)
498+
}
499+
398500
/// Returns the datatype of the dataset/attribute.
399501
pub fn dtype(&self) -> Result<Datatype> {
400502
if self.is_attr() {

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ mod export {
5858
hl::selection::{Hyperslab, Selection, SliceOrIndex},
5959
hl::{
6060
Attribute, AttributeBuilder, AttributeBuilderData, AttributeBuilderEmpty,
61-
AttributeBuilderEmptyShape, Container, Conversion, Dataset, DatasetBuilder,
61+
AttributeBuilderEmptyShape, ByteReader, Container, Conversion, Dataset, DatasetBuilder,
6262
DatasetBuilderData, DatasetBuilderEmpty, DatasetBuilderEmptyShape, Dataspace, Datatype,
6363
File, FileBuilder, Group, LinkInfo, LinkType, Location, LocationInfo, LocationToken,
6464
LocationType, Object, PropertyList, Reader, Writer,

tests/test_dataset.rs

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::convert::TryFrom;
22
use std::fmt;
3+
use std::io::{Read, Seek, SeekFrom};
34

45
use ndarray::{s, Array1, Array2, ArrayD, IxDyn, SliceInfo};
56
use rand::prelude::{Rng, SeedableRng, SmallRng};
67

7-
use hdf5_types::TypeDescriptor;
8+
use hdf5_types::{H5Type, TypeDescriptor};
89

910
mod common;
1011

@@ -171,6 +172,85 @@ where
171172
Ok(())
172173
}
173174

175+
fn test_byte_read_seek_impl(ds: &hdf5::Dataset, arr: &ArrayD<u8>, ndim: usize) -> hdf5::Result<()> {
176+
let mut rng = SmallRng::seed_from_u64(42);
177+
ds.write(arr)?;
178+
179+
// Read whole
180+
let reader = ds.as_byte_reader();
181+
let mut reader = if ndim != 1 {
182+
assert!(reader.is_err());
183+
return Ok(());
184+
} else {
185+
reader.unwrap()
186+
};
187+
let mut out_bytes = vec![0u8; arr.len()];
188+
reader.read(&mut out_bytes.as_mut_slice()).expect("io::Read failed");
189+
assert_eq!(out_bytes.as_slice(), arr.as_slice().unwrap());
190+
191+
// Read in chunks
192+
let mut reader = reader.clone();
193+
reader.seek(std::io::SeekFrom::Start(0)).expect("io::Seek failed");
194+
let mut pos = 0;
195+
while pos < arr.len() {
196+
let chunk_len: usize = rng.gen_range(1..arr.len() + 1);
197+
let mut chunk = vec![0u8; chunk_len];
198+
let n_read = reader.read(&mut chunk).expect("io::Read failed");
199+
if pos + chunk_len < arr.len() {
200+
// We did not read until end. Thus, the chunk should be fully filled.
201+
assert_eq!(chunk_len, n_read);
202+
}
203+
assert_eq!(&chunk[..n_read], arr.slice(s![pos..pos + n_read]).as_slice().unwrap());
204+
pos += chunk_len;
205+
}
206+
207+
// Seek to the begining and read again
208+
reader.seek(SeekFrom::Start(0)).expect("io::Seek failed");
209+
let mut out_bytes = vec![0u8; arr.len()];
210+
reader.read(&mut out_bytes.as_mut_slice()).expect("io::Read failed");
211+
assert_eq!(out_bytes.as_slice(), arr.as_slice().unwrap());
212+
213+
// Seek to a random position from start
214+
let pos = rng.gen_range(0..arr.len() + 1) as u64;
215+
let seeked_pos = reader.seek(SeekFrom::Start(pos)).expect("io::Seek failed") as usize;
216+
let mut out_bytes = vec![0u8; arr.len() - seeked_pos];
217+
reader.read(&mut out_bytes.as_mut_slice()).expect("io::Read failed");
218+
assert_eq!(out_bytes.as_slice(), arr.slice(s![seeked_pos..]).as_slice().unwrap());
219+
220+
// Seek from current position
221+
let orig_pos = reader.seek(SeekFrom::Start(pos)).expect("io::Seek failed") as i64;
222+
let rel_pos = rng.gen_range(-(arr.len() as i64)..arr.len() as i64 + 1);
223+
let pos_res = reader.seek(SeekFrom::Current(rel_pos));
224+
if (rel_pos + orig_pos) < 0 {
225+
assert!(pos_res.is_err()) // We cannot seek before start
226+
} else {
227+
let seeked_pos = pos_res.unwrap() as usize;
228+
assert_eq!(rel_pos + orig_pos, seeked_pos as i64);
229+
let mut out_bytes = vec![0u8; arr.len() - seeked_pos];
230+
reader.read(&mut out_bytes.as_mut_slice()).expect("io::Read failed");
231+
assert_eq!(out_bytes.as_slice(), arr.slice(s![seeked_pos..]).as_slice().unwrap());
232+
}
233+
234+
// Seek to a random position from end
235+
let pos = -(rng.gen_range(0..arr.len() + 1) as i64);
236+
let seeked_pos = reader.seek(SeekFrom::End(pos)).expect("io::Seek failed") as usize;
237+
assert_eq!(pos, seeked_pos as i64 - arr.len() as i64);
238+
let mut out_bytes = vec![0u8; arr.len() - seeked_pos];
239+
reader.read(&mut out_bytes.as_mut_slice()).expect("io::Read failed");
240+
assert_eq!(out_bytes.as_slice(), arr.slice(s![seeked_pos..]).as_slice().unwrap());
241+
242+
// Seek before start
243+
assert!(reader.seek(SeekFrom::End(-(arr.len() as i64) - 1)).is_err());
244+
245+
// Test stream position start
246+
// Requires Rust 1.55.0: reader.rewind().expect("io::Seek::rewind failed");
247+
assert_eq!(0, reader.seek(SeekFrom::Start(0)).unwrap());
248+
assert_eq!(0, reader.stream_position().unwrap());
249+
assert_eq!(0, reader.seek(SeekFrom::End(-(arr.len() as i64))).unwrap());
250+
assert_eq!(0, reader.stream_position().unwrap());
251+
Ok(())
252+
}
253+
174254
fn test_read_write<T>() -> hdf5::Result<()>
175255
where
176256
T: hdf5::H5Type + fmt::Debug + PartialEq + Gen + Clone,
@@ -278,3 +358,24 @@ fn test_read_write_rename_fields() -> hdf5::Result<()> {
278358
test_read_write::<RenameEnum>()?;
279359
Ok(())
280360
}
361+
362+
#[test]
363+
fn test_byte_read_seek() -> hdf5::Result<()> {
364+
let mut rng = SmallRng::seed_from_u64(42);
365+
let file = new_in_memory_file()?;
366+
367+
for ndim in 0..=2 {
368+
for _ in 0..=20 {
369+
let arr: ArrayD<u8> = gen_arr(&mut rng, ndim);
370+
371+
let ds: hdf5::Dataset = file.new_dataset::<u8>().shape(arr.shape()).create("x")?;
372+
let ds = scopeguard::guard(ds, |ds| {
373+
drop(ds);
374+
drop(file.unlink("x"));
375+
});
376+
377+
test_byte_read_seek_impl(&ds, &arr, ndim)?;
378+
}
379+
}
380+
Ok(())
381+
}

0 commit comments

Comments
 (0)