Skip to content

feat: Extract FileRead and FileWrite trait #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
]

[workspace.package]
Expand Down Expand Up @@ -64,7 +64,7 @@ log = "^0.4"
mockito = "^1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
opendal = "0.46"
ordered-float = "4.0.0"
parquet = "51"
pilota = "0.11.0"
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/glue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ uuid = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
15 changes: 5 additions & 10 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use iceberg::{
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use typed_builder::TypedBuilder;

Expand Down Expand Up @@ -358,13 +357,10 @@ impl Catalog for GlueCatalog {
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
self.file_io
.new_output(&metadata_location)?
.writer()
.write(serde_json::to_vec(&metadata)?.into())
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let glue_table = convert_to_glue_table(
&table_name,
Expand Down Expand Up @@ -431,10 +427,9 @@ impl Catalog for GlueCatalog {
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
let input_file = self.file_io.new_input(&metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
.file_io(self.file_io())
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ volo-thrift = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
15 changes: 4 additions & 11 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use iceberg::{
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;

Expand Down Expand Up @@ -349,13 +347,10 @@ impl Catalog for HmsCatalog {
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
self.file_io
.new_output(&metadata_location)?
.writer()
.write(serde_json::to_vec(&metadata)?.into())
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let hive_table = convert_to_hive_table(
db_name.clone(),
Expand Down Expand Up @@ -406,10 +401,8 @@ impl Catalog for HmsCatalog {

let metadata_location = get_metadata_location(&hive_table.parameters)?;

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
.file_io(self.file_io())
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
opendal = { workspace = true, features = ["services-fs"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
59 changes: 53 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::{try_join, TryFutureExt};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::SchemaDescriptor;
use std::collections::HashMap;
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;

use crate::arrow::arrow_schema_to_schema;
use crate::io::FileIO;
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -91,12 +98,12 @@ impl ArrowReader {

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_reader = file_io
.new_input(task.data().data_file().file_path())?
.reader()
.await?;
let parquet_file = file_io
.new_input(task.data().data_file().file_path())?;
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
Expand Down Expand Up @@ -187,3 +194,43 @@ impl ArrowReader {
}
}
}

/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
///
/// # TODO
///
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct:
///
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start as _..range.end as _)
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
)
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let mut loader = MetadataLoader::load(self, file_size as usize, None).await?;
loader.load_page_index(false, false).await?;
Ok(Arc::new(loader.finish()))
})
}
}
119 changes: 100 additions & 19 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@
//! - `new_input`: Create input file for reading.
//! - `new_output`: Create output file for writing.

use bytes::Bytes;
use std::ops::Range;
use std::{collections::HashMap, sync::Arc};

use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::AsyncWrite as TokioAsyncWrite;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;

/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
Expand Down Expand Up @@ -206,6 +205,35 @@ impl FileIO {
}
}

/// The struct the represents the metadata of a file.
///
/// TODO: we can add last modified time, content type, etc. in the future.
pub struct FileMetadata {
/// The size of the file.
pub size: u64,
}

/// Trait for reading file.
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileRead: Send + Unpin + 'static {
/// Read file content with given range.
///
/// TODO: we can support reading non-contiguous bytes in the future.
async fn read(&self, range: Range<u64>) -> Result<Bytes>;
}

#[async_trait::async_trait]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
}
}

/// Input file is used for reading from files.
#[derive(Debug)]
pub struct InputFile {
Expand All @@ -216,14 +244,6 @@ pub struct InputFile {
relative_path_pos: usize,
}

/// Trait for reading file.
pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {}

impl<T> FileRead for T where
T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
{
}

impl InputFile {
/// Absolute path to root uri.
pub fn location(&self) -> &str {
Expand All @@ -238,16 +258,63 @@ impl InputFile {
.await?)
}

/// Creates [`InputStream`] for reading.
/// Fetch and returns metadata of file.
pub async fn metadata(&self) -> Result<FileMetadata> {
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;

Ok(FileMetadata {
size: meta.content_length(),
})
}

/// Read and returns whole content of file.
///
/// For continues reading, use [`Self::reader`] instead.
pub async fn read(&self) -> Result<Bytes> {
Ok(self
.op
.read(&self.path[self.relative_path_pos..])
.await?
.to_bytes())
}

/// Creates [`FileRead`] for continues reading.
///
/// For one-time reading, use [`Self::read`] instead.
pub async fn reader(&self) -> Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}

/// Trait for writing file.
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileWrite: Send + Unpin + 'static {
/// Write bytes to file.
///
/// TODO: we can support writing non-contiguous bytes in the future.
async fn write(&mut self, bs: Bytes) -> Result<()>;

impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
/// Close file.
///
/// Calling close on closed file will generate an error.
async fn close(&mut self) -> Result<()>;
}

#[async_trait::async_trait]
impl FileWrite for opendal::Writer {
async fn write(&mut self, bs: Bytes) -> Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
}

async fn close(&mut self) -> Result<()> {
Ok(opendal::Writer::close(self).await?)
}
}

/// Output file is used for writing to files..
#[derive(Debug)]
Expand Down Expand Up @@ -282,7 +349,23 @@ impl OutputFile {
}
}

/// Creates output file for writing.
/// Create a new output file with given bytes.
///
/// # Notes
///
/// Calling `write` will overwrite the file if it exists.
/// For continues writing, use [`Self::writer`].
pub async fn write(&self, bs: Bytes) -> Result<()> {
let mut writer = self.writer().await?;
writer.write(bs).await?;
writer.close().await
}

/// Creates output file for continues writing.
///
/// # Notes
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
Expand Down Expand Up @@ -398,7 +481,7 @@ mod tests {
use std::{fs::File, path::Path};

use futures::io::AllowStdIo;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::AsyncReadExt;

use tempfile::TempDir;

Expand Down Expand Up @@ -483,9 +566,7 @@ mod tests {

assert!(!output_file.exists().await.unwrap());
{
let mut writer = output_file.writer().await.unwrap();
writer.write_all(content.as_bytes()).await.unwrap();
writer.close().await.unwrap();
output_file.write(content.into()).await.unwrap();
}

assert_eq!(&full_path, output_file.location());
Expand Down
Loading
Loading