Skip to content

Commit 345ca7e

Browse files
Xuanwoshaeqahmed
authored andcommitted
feat: Extract FileRead and FileWrite trait (apache#364)
* feat: Extract FileRead and FileWrie trait Signed-off-by: Xuanwo <[email protected]> * Enable s3 services for tests Signed-off-by: Xuanwo <[email protected]> * Fix sort Signed-off-by: Xuanwo <[email protected]> * Add comment for io trait Signed-off-by: Xuanwo <[email protected]> * Fix test for rest Signed-off-by: Xuanwo <[email protected]> * Use try join Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent dbd6048 commit 345ca7e

File tree

17 files changed

+315
-149
lines changed

17 files changed

+315
-149
lines changed

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
[workspace]
1919
resolver = "2"
2020
members = [
21-
"crates/catalog/*",
22-
"crates/examples",
23-
"crates/iceberg",
24-
"crates/integrations/*",
25-
"crates/test_utils",
21+
"crates/catalog/*",
22+
"crates/examples",
23+
"crates/iceberg",
24+
"crates/integrations/*",
25+
"crates/test_utils",
2626
]
2727

2828
[workspace.package]
@@ -64,7 +64,7 @@ log = "^0.4"
6464
mockito = "^1"
6565
murmur3 = "0.5.2"
6666
once_cell = "1"
67-
opendal = "0.45"
67+
opendal = "0.46"
6868
ordered-float = "4.0.0"
6969
parquet = "51"
7070
pilota = "0.11.0"

crates/catalog/glue/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ uuid = { workspace = true }
4242

4343
[dev-dependencies]
4444
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
45+
opendal = { workspace = true, features = ["services-s3"] }
4546
port_scanner = { workspace = true }

crates/catalog/glue/src/catalog.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use iceberg::{
2525
TableIdent,
2626
};
2727
use std::{collections::HashMap, fmt::Debug};
28-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2928

3029
use typed_builder::TypedBuilder;
3130

@@ -358,13 +357,10 @@ impl Catalog for GlueCatalog {
358357
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
359358
let metadata_location = create_metadata_location(&location, 0)?;
360359

361-
let mut file = self
362-
.file_io
360+
self.file_io
363361
.new_output(&metadata_location)?
364-
.writer()
362+
.write(serde_json::to_vec(&metadata)?.into())
365363
.await?;
366-
file.write_all(&serde_json::to_vec(&metadata)?).await?;
367-
file.shutdown().await?;
368364

369365
let glue_table = convert_to_glue_table(
370366
&table_name,
@@ -431,10 +427,9 @@ impl Catalog for GlueCatalog {
431427
Some(table) => {
432428
let metadata_location = get_metadata_location(&table.parameters)?;
433429

434-
let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
435-
let mut metadata_str = String::new();
436-
reader.read_to_string(&mut metadata_str).await?;
437-
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
430+
let input_file = self.file_io.new_input(&metadata_location)?;
431+
let metadata_content = input_file.read().await?;
432+
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
438433

439434
let table = Table::builder()
440435
.file_io(self.file_io())

crates/catalog/hms/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ volo-thrift = { workspace = true }
4444

4545
[dev-dependencies]
4646
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
47+
opendal = { workspace = true, features = ["services-s3"] }
4748
port_scanner = { workspace = true }

crates/catalog/hms/src/catalog.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ use iceberg::{
3535
use std::collections::HashMap;
3636
use std::fmt::{Debug, Formatter};
3737
use std::net::ToSocketAddrs;
38-
use tokio::io::AsyncReadExt;
39-
use tokio::io::AsyncWriteExt;
4038
use typed_builder::TypedBuilder;
4139
use volo_thrift::ResponseError;
4240

@@ -349,13 +347,10 @@ impl Catalog for HmsCatalog {
349347
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
350348
let metadata_location = create_metadata_location(&location, 0)?;
351349

352-
let mut file = self
353-
.file_io
350+
self.file_io
354351
.new_output(&metadata_location)?
355-
.writer()
352+
.write(serde_json::to_vec(&metadata)?.into())
356353
.await?;
357-
file.write_all(&serde_json::to_vec(&metadata)?).await?;
358-
file.shutdown().await?;
359354

360355
let hive_table = convert_to_hive_table(
361356
db_name.clone(),
@@ -406,10 +401,8 @@ impl Catalog for HmsCatalog {
406401

407402
let metadata_location = get_metadata_location(&hive_table.parameters)?;
408403

409-
let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
410-
let mut metadata_str = String::new();
411-
reader.read_to_string(&mut metadata_str).await?;
412-
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
404+
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
405+
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
413406

414407
let table = Table::builder()
415408
.file_io(self.file_io())

crates/catalog/rest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,6 @@ uuid = { workspace = true, features = ["v4"] }
4646
[dev-dependencies]
4747
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
4848
mockito = { workspace = true }
49+
opendal = { workspace = true, features = ["services-fs"] }
4950
port_scanner = { workspace = true }
5051
tokio = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@
1919
2020
use arrow_schema::SchemaRef as ArrowSchemaRef;
2121
use async_stream::try_stream;
22+
use bytes::Bytes;
23+
use futures::future::BoxFuture;
2224
use futures::stream::StreamExt;
25+
use futures::{try_join, TryFutureExt};
26+
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
2327
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
28+
use parquet::file::metadata::ParquetMetaData;
2429
use parquet::schema::types::SchemaDescriptor;
2530
use std::collections::HashMap;
31+
use std::ops::Range;
2632
use std::str::FromStr;
33+
use std::sync::Arc;
2734

2835
use crate::arrow::arrow_schema_to_schema;
29-
use crate::io::FileIO;
36+
use crate::io::{FileIO, FileMetadata, FileRead};
3037
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
3138
use crate::spec::SchemaRef;
3239
use crate::{Error, ErrorKind};
@@ -91,12 +98,12 @@ impl ArrowReader {
9198

9299
Ok(try_stream! {
93100
while let Some(Ok(task)) = tasks.next().await {
94-
let parquet_reader = file_io
95-
.new_input(task.data().data_file().file_path())?
96-
.reader()
97-
.await?;
101+
let parquet_file = file_io
102+
.new_input(task.data().data_file().file_path())?;
103+
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
104+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
98105

99-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
106+
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
100107
.await?;
101108

102109
let parquet_schema = batch_stream_builder.parquet_schema();
@@ -187,3 +194,43 @@ impl ArrowReader {
187194
}
188195
}
189196
}
197+
198+
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
199+
///
200+
/// # TODO
201+
///
202+
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct:
203+
///
204+
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
205+
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
206+
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
207+
struct ArrowFileReader<R: FileRead> {
208+
meta: FileMetadata,
209+
r: R,
210+
}
211+
212+
impl<R: FileRead> ArrowFileReader<R> {
213+
/// Create a new ArrowFileReader
214+
fn new(meta: FileMetadata, r: R) -> Self {
215+
Self { meta, r }
216+
}
217+
}
218+
219+
impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
220+
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
221+
Box::pin(
222+
self.r
223+
.read(range.start as _..range.end as _)
224+
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
225+
)
226+
}
227+
228+
fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
229+
Box::pin(async move {
230+
let file_size = self.meta.size;
231+
let mut loader = MetadataLoader::load(self, file_size as usize, None).await?;
232+
loader.load_page_index(false, false).await?;
233+
Ok(Arc::new(loader.finish()))
234+
})
235+
}
236+
}

crates/iceberg/src/io.rs

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,13 @@
4848
//! - `new_input`: Create input file for reading.
4949
//! - `new_output`: Create output file for writing.
5050
51+
use bytes::Bytes;
52+
use std::ops::Range;
5153
use std::{collections::HashMap, sync::Arc};
5254

5355
use crate::{error::Result, Error, ErrorKind};
54-
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
5556
use once_cell::sync::Lazy;
5657
use opendal::{Operator, Scheme};
57-
use tokio::io::AsyncWrite as TokioAsyncWrite;
58-
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
5958
use url::Url;
6059

6160
/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
@@ -206,6 +205,35 @@ impl FileIO {
206205
}
207206
}
208207

208+
/// The struct the represents the metadata of a file.
209+
///
210+
/// TODO: we can add last modified time, content type, etc. in the future.
211+
pub struct FileMetadata {
212+
/// The size of the file.
213+
pub size: u64,
214+
}
215+
216+
/// Trait for reading file.
217+
///
218+
/// # TODO
219+
///
220+
/// It's possible for us to remove the async_trait, but we need to figure
221+
/// out how to handle the object safety.
222+
#[async_trait::async_trait]
223+
pub trait FileRead: Send + Unpin + 'static {
224+
/// Read file content with given range.
225+
///
226+
/// TODO: we can support reading non-contiguous bytes in the future.
227+
async fn read(&self, range: Range<u64>) -> Result<Bytes>;
228+
}
229+
230+
#[async_trait::async_trait]
231+
impl FileRead for opendal::Reader {
232+
async fn read(&self, range: Range<u64>) -> Result<Bytes> {
233+
Ok(opendal::Reader::read(self, range).await?.to_bytes())
234+
}
235+
}
236+
209237
/// Input file is used for reading from files.
210238
#[derive(Debug)]
211239
pub struct InputFile {
@@ -216,14 +244,6 @@ pub struct InputFile {
216244
relative_path_pos: usize,
217245
}
218246

219-
/// Trait for reading file.
220-
pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {}
221-
222-
impl<T> FileRead for T where
223-
T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
224-
{
225-
}
226-
227247
impl InputFile {
228248
/// Absolute path to root uri.
229249
pub fn location(&self) -> &str {
@@ -238,16 +258,63 @@ impl InputFile {
238258
.await?)
239259
}
240260

241-
/// Creates [`InputStream`] for reading.
261+
/// Fetch and returns metadata of file.
262+
pub async fn metadata(&self) -> Result<FileMetadata> {
263+
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
264+
265+
Ok(FileMetadata {
266+
size: meta.content_length(),
267+
})
268+
}
269+
270+
/// Read and returns whole content of file.
271+
///
272+
/// For continues reading, use [`Self::reader`] instead.
273+
pub async fn read(&self) -> Result<Bytes> {
274+
Ok(self
275+
.op
276+
.read(&self.path[self.relative_path_pos..])
277+
.await?
278+
.to_bytes())
279+
}
280+
281+
/// Creates [`FileRead`] for continues reading.
282+
///
283+
/// For one-time reading, use [`Self::read`] instead.
242284
pub async fn reader(&self) -> Result<impl FileRead> {
243285
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
244286
}
245287
}
246288

247289
/// Trait for writing file.
248-
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
290+
///
291+
/// # TODO
292+
///
293+
/// It's possible for us to remove the async_trait, but we need to figure
294+
/// out how to handle the object safety.
295+
#[async_trait::async_trait]
296+
pub trait FileWrite: Send + Unpin + 'static {
297+
/// Write bytes to file.
298+
///
299+
/// TODO: we can support writing non-contiguous bytes in the future.
300+
async fn write(&mut self, bs: Bytes) -> Result<()>;
249301

250-
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
302+
/// Close file.
303+
///
304+
/// Calling close on closed file will generate an error.
305+
async fn close(&mut self) -> Result<()>;
306+
}
307+
308+
#[async_trait::async_trait]
309+
impl FileWrite for opendal::Writer {
310+
async fn write(&mut self, bs: Bytes) -> Result<()> {
311+
Ok(opendal::Writer::write(self, bs).await?)
312+
}
313+
314+
async fn close(&mut self) -> Result<()> {
315+
Ok(opendal::Writer::close(self).await?)
316+
}
317+
}
251318

252319
/// Output file is used for writing to files..
253320
#[derive(Debug)]
@@ -282,7 +349,23 @@ impl OutputFile {
282349
}
283350
}
284351

285-
/// Creates output file for writing.
352+
/// Create a new output file with given bytes.
353+
///
354+
/// # Notes
355+
///
356+
/// Calling `write` will overwrite the file if it exists.
357+
/// For continues writing, use [`Self::writer`].
358+
pub async fn write(&self, bs: Bytes) -> Result<()> {
359+
let mut writer = self.writer().await?;
360+
writer.write(bs).await?;
361+
writer.close().await
362+
}
363+
364+
/// Creates output file for continues writing.
365+
///
366+
/// # Notes
367+
///
368+
/// For one-time writing, use [`Self::write`] instead.
286369
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
287370
Ok(Box::new(
288371
self.op.writer(&self.path[self.relative_path_pos..]).await?,
@@ -398,7 +481,7 @@ mod tests {
398481
use std::{fs::File, path::Path};
399482

400483
use futures::io::AllowStdIo;
401-
use futures::{AsyncReadExt, AsyncWriteExt};
484+
use futures::AsyncReadExt;
402485

403486
use tempfile::TempDir;
404487

@@ -483,9 +566,7 @@ mod tests {
483566

484567
assert!(!output_file.exists().await.unwrap());
485568
{
486-
let mut writer = output_file.writer().await.unwrap();
487-
writer.write_all(content.as_bytes()).await.unwrap();
488-
writer.close().await.unwrap();
569+
output_file.write(content.into()).await.unwrap();
489570
}
490571

491572
assert_eq!(&full_path, output_file.location());

0 commit comments

Comments
 (0)