diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs new file mode 100644 index 000000000..663b7e77c --- /dev/null +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -0,0 +1,530 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use futures::channel::oneshot; +use futures::future::join_all; +use futures::{StreamExt, TryStreamExt}; +use tokio::sync::oneshot::{channel, Receiver}; + +use super::delete_filter::{DeleteFilter, EqDelFuture}; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use crate::arrow::ArrowReader; +use crate::delete_vector::DeleteVector; +use crate::expr::Predicate; +use crate::io::FileIO; +use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; +use crate::spec::{DataContentType, Schema, SchemaRef}; +use crate::{Error, ErrorKind, Result}; + +#[allow(unused)] +pub trait DeleteFileLoader { + /// Read the delete file referred to in the task + /// + /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution. + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result; +} + +#[allow(unused)] +#[derive(Clone, Debug)] +pub(crate) struct CachingDeleteFileLoader { + file_io: FileIO, + concurrency_limit_data_files: usize, + del_filter: DeleteFilter, +} + +impl DeleteFileLoader for CachingDeleteFileLoader { + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result { + let raw_batch_stream = + CachingDeleteFileLoader::parquet_to_batch_stream(&task.file_path, self.file_io.clone()) + .await?; + + Self::evolve_schema(raw_batch_stream, schema).await + } +} + +// Intermediate context during processing of a delete file task. +enum DeleteFileContext { + // TODO: Delete Vector loader from Puffin files + InProgEqDel(EqDelFuture), + PosDels(ArrowRecordBatchStream), + FreshEqDel { + batch_stream: ArrowRecordBatchStream, + sender: oneshot::Sender, + }, +} + +// Final result of the processing of a delete file task before +// results are fully merged into the DeleteFileManager's state +enum ParsedDeleteFileContext { + InProgEqDel(EqDelFuture), + DelVecs(HashMap), + EqDel, +} + +#[allow(unused_variables)] +impl CachingDeleteFileLoader { + pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + CachingDeleteFileLoader { + file_io, + concurrency_limit_data_files, + del_filter: DeleteFilter::default(), + } + } + + /// Load the deletes for all the specified tasks + /// + /// Returned future completes once all loading has finished. + /// + /// * Create a single stream of all delete file tasks irrespective of type, + /// so that we can respect the combined concurrency limit + /// * We then process each in two phases: load and parse. + /// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to + /// stream the file contents out + /// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by + /// another concurrently processing data file scan task. If it is, we return a future + /// for the pre-existing task from the load phase. If not, we create such a future + /// and store it in the state to prevent other data file tasks from starting to load + /// the same equality delete file, and return a record batch stream from the load phase + /// as per the other delete file types - only this time it is accompanied by a one-shot + /// channel sender that we will eventually use to resolve the shared future that we stored + /// in the state. + /// * When this gets updated to add support for delete vectors, the load phase will return + /// a PuffinReader for them. + /// * The parse phase parses each record batch stream according to its associated data type. + /// The result of this is a map of data file paths to delete vectors for the positional + /// delete tasks (and in future for the delete vector tasks). For equality delete + /// file tasks, this results in an unbound Predicate. + /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot + /// channel to store them in the right place in the delete file managers state. + /// * The results of all of these futures are awaited on in parallel with the specified + /// level of concurrency and collected into a vec. We then combine all the delete + /// vector maps that resulted from any positional delete or delete vector files into a + /// single map and persist it in the state. + /// + /// + /// Conceptually, the data flow is like this: + /// ```none + /// FileScanTaskDeleteFile + /// | + /// Already-loading EQ Delete | Everything Else + /// +---------------------------------------------------+ + /// | | + /// [get existing future] [load recordbatch stream / puffin] + /// DeleteFileContext::InProgEqDel DeleteFileContext + /// | | + /// | | + /// | +-----------------------------+--------------------------+ + /// | Pos Del Del Vec (Not yet Implemented) EQ Del + /// | | | | + /// | [parse pos del stream] [parse del vec puffin] [parse eq del] + /// | HashMap HashMap (Predicate, Sender) + /// | | | | + /// | | | [persist to state] + /// | | | () + /// | | | | + /// | +-----------------------------+--------------------------+ + /// | | + /// | [buffer unordered] + /// | | + /// | [combine del vectors] + /// | HashMap + /// | | + /// | [persist del vectors to state] + /// | () + /// | | + /// +-------------------------+-------------------------+ + /// | + /// [join!] + /// ``` + pub(crate) fn load_deletes( + &self, + delete_file_entries: &[FileScanTaskDeleteFile], + schema: SchemaRef, + ) -> Receiver> { + let (tx, rx) = channel(); + + let stream_items = delete_file_entries + .iter() + .map(|t| { + ( + t.clone(), + self.file_io.clone(), + self.del_filter.clone(), + schema.clone(), + ) + }) + .collect::>(); + let task_stream = futures::stream::iter(stream_items); + let del_filter = self.del_filter.clone(); + let concurrency_limit_data_files = self.concurrency_limit_data_files; + crate::runtime::spawn(async move { + let result = async move { + let mut del_filter = del_filter; + + let results: Vec = task_stream + .map(move |(task, file_io, del_filter, schema)| async move { + Self::load_file_for_task(&task, file_io, del_filter, schema).await + }) + .map(move |ctx| { + Ok(async { Self::parse_file_content_for_task(ctx.await?).await }) + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_collect::>() + .await?; + + // wait for all in-progress EQ deletes from other tasks + let _ = join_all(results.iter().filter_map(|i| { + if let ParsedDeleteFileContext::InProgEqDel(fut) = i { + Some(fut.clone()) + } else { + None + } + })) + .await; + + for item in results { + if let ParsedDeleteFileContext::DelVecs(hash_map) = item { + for (data_file_path, delete_vector) in hash_map.into_iter() { + del_filter.upsert_delete_vector(data_file_path, delete_vector); + } + } + } + + Ok(del_filter) + } + .await; + + let _ = tx.send(result); + }); + + rx + } + + async fn load_file_for_task( + task: &FileScanTaskDeleteFile, + file_io: FileIO, + del_filter: DeleteFilter, + schema: SchemaRef, + ) -> Result { + match task.file_type { + DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + )), + + DataContentType::EqualityDeletes => { + let sender = { + if let Some(existing) = del_filter + .get_equality_delete_predicate_for_delete_file_path(&task.file_path) + { + return Ok(DeleteFileContext::InProgEqDel(existing.clone())); + } + + let (sender, fut) = EqDelFuture::new(); + + del_filter.insert_equality_delete(task.file_path.to_string(), fut); + + sender + }; + + Ok(DeleteFileContext::FreshEqDel { + batch_stream: Self::evolve_schema( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + schema, + ) + .await?, + sender, + }) + } + + DataContentType::Data => Err(Error::new( + ErrorKind::Unexpected, + "tasks with files of type Data not expected here", + )), + } + } + + async fn parse_file_content_for_task( + ctx: DeleteFileContext, + ) -> Result { + match ctx { + DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)), + DeleteFileContext::PosDels(batch_stream) => { + let del_vecs = + Self::parse_positional_deletes_record_batch_stream(batch_stream).await?; + Ok(ParsedDeleteFileContext::DelVecs(del_vecs)) + } + DeleteFileContext::FreshEqDel { + sender, + batch_stream, + } => { + let predicate = + Self::parse_equality_deletes_record_batch_stream(batch_stream).await?; + + sender + .send(predicate) + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Could not send eq delete predicate to state", + ) + }) + .map(|_| ParsedDeleteFileContext::EqDel) + } + } + } + + /// Loads a RecordBatchStream for a given datafile. + async fn parquet_to_batch_stream( + data_file_path: &str, + file_io: FileIO, + ) -> Result { + /* + Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly + as that introduces a circular dependency. + */ + let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( + data_file_path, + file_io.clone(), + false, + ) + .await? + .build()? + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e))); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + + /// Evolves the schema of the RecordBatches from an equality delete file + async fn evolve_schema( + record_batch_stream: ArrowRecordBatchStream, + target_schema: Arc, + ) -> Result { + let eq_ids = target_schema + .as_ref() + .field_id_to_name_map() + .keys() + .cloned() + .collect::>(); + + let mut record_batch_transformer = + RecordBatchTransformer::build(target_schema.clone(), &eq_ids); + + let record_batch_stream = record_batch_stream.map(move |record_batch| { + record_batch.and_then(|record_batch| { + record_batch_transformer.process_record_batch(record_batch) + }) + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + + /// Parses a record batch stream coming from positional delete files + /// + /// Returns a map of data file path to a delete vector + async fn parse_positional_deletes_record_batch_stream( + stream: ArrowRecordBatchStream, + ) -> Result> { + // TODO + + Err(Error::new( + ErrorKind::FeatureUnsupported, + "parsing of positional deletes is not yet supported", + )) + } + + /// Parses record batch streams from individual equality delete files + /// + /// Returns an unbound Predicate for each batch stream + async fn parse_equality_deletes_record_batch_stream( + streams: ArrowRecordBatchStream, + ) -> Result { + // TODO + + Err(Error::new( + ErrorKind::FeatureUnsupported, + "parsing of equality deletes is not yet supported", + )) + } +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::scan::FileScanTask; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader fails with the expected FeatureUnsupportedError. + let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + let result = delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap(); + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } + + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + } +} diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs deleted file mode 100644 index e1ca47679..000000000 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use crate::delete_vector::DeleteVector; -use crate::expr::BoundPredicate; -use crate::io::FileIO; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::SchemaRef; -use crate::{Error, ErrorKind, Result}; - -#[allow(unused)] -pub trait DeleteFileManager { - /// Read the delete file referred to in the task - /// - /// Returns the raw contents of the delete file as a RecordBatch stream - fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result; -} - -#[allow(unused)] -#[derive(Clone, Debug)] -pub(crate) struct CachingDeleteFileManager { - file_io: FileIO, - concurrency_limit_data_files: usize, -} - -impl DeleteFileManager for CachingDeleteFileManager { - fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result { - // TODO, implementation in https://github.com/apache/iceberg-rust/pull/982 - - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } -} - -#[allow(unused_variables)] -impl CachingDeleteFileManager { - pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager { - Self { - file_io, - concurrency_limit_data_files, - } - } - - pub(crate) async fn load_deletes( - &self, - delete_file_entries: Vec, - ) -> Result<()> { - // TODO - - if !delete_file_entries.is_empty() { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } else { - Ok(()) - } - } - - pub(crate) fn build_delete_predicate( - &self, - snapshot_schema: SchemaRef, - ) -> Result> { - // TODO - - Ok(None) - } - - pub(crate) fn get_positional_delete_indexes_for_data_file( - &self, - data_file_path: &str, - ) -> Option> { - // TODO - - None - } -} diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs new file mode 100644 index 000000000..aee69c3cc --- /dev/null +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex, OnceLock, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::oneshot; + +use crate::delete_vector::DeleteVector; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; +use crate::spec::DataContentType; +use crate::{Error, ErrorKind, Result}; + +// Equality deletes may apply to more than one DataFile in a scan, and so +// the same equality delete file may be present in more than one invocation of +// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these +// to avoid having to load them twice, so we immediately store cloneable futures in the +// state that can be awaited upon to get te EQ deletes. That way we can check to see if +// a load of each Eq delete file is already in progress and avoid starting another one. +#[derive(Debug, Clone)] +pub(crate) struct EqDelFuture { + result: OnceLock, +} + +impl EqDelFuture { + pub(crate) fn new() -> (oneshot::Sender, Self) { + let (tx, rx) = oneshot::channel(); + let result = OnceLock::new(); + + crate::runtime::spawn({ + let result = result.clone(); + async move { result.set(rx.await.unwrap()) } + }); + + (tx, Self { result }) + } +} + +impl Future for EqDelFuture { + type Output = Predicate; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + match self.result.get() { + None => Poll::Pending, + Some(predicate) => Poll::Ready(predicate.clone()), + } + } +} + +#[derive(Debug, Default)] +struct DeleteFileFilterState { + delete_vectors: HashMap>>, + equality_deletes: HashMap, +} + +#[derive(Clone, Debug, Default)] +pub struct DeleteFilter { + state: Arc>, +} + +impl DeleteFilter { + /// Retrieve a delete vector for the data file associated with a given file scan task + pub fn get_delete_vector( + &self, + file_scan_task: &FileScanTask, + ) -> Option>> { + self.get_delete_vector_for_path(file_scan_task.data_file_path()) + } + + /// Retrieve a delete vector for a data file + pub fn get_delete_vector_for_path( + &self, + delete_file_path: &str, + ) -> Option>> { + self.state + .read() + .ok() + .and_then(|st| st.delete_vectors.get(delete_file_path).cloned()) + } + + /// Retrieve the equality delete predicate for a given eq delete file path + pub(crate) fn get_equality_delete_predicate_for_delete_file_path( + &self, + file_path: &str, + ) -> Option { + self.state + .read() + .unwrap() + .equality_deletes + .get(file_path) + .cloned() + } + + /// Builds eq delete predicate for the provided task. + pub async fn build_equality_delete_predicate( + &self, + file_scan_task: &FileScanTask, + ) -> Result> { + // * Filter the task's deletes into just the Equality deletes + // * Retrieve the unbound predicate for each from self.state.equality_deletes + // * Logical-AND them all together to get a single combined `Predicate` + // * Bind the predicate to the task's schema to get a `BoundPredicate` + + let mut combined_predicate = AlwaysTrue; + for delete in &file_scan_task.deletes { + if !is_equality_delete(delete) { + continue; + } + + let Some(predicate) = + self.get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Missing predicate for equality delete file '{}'", + delete.file_path + ), + )); + }; + + combined_predicate = combined_predicate.and(predicate.await); + } + + if combined_predicate == AlwaysTrue { + return Ok(None); + } + + // TODO: handle case-insensitive case + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + Ok(Some(bound_predicate)) + } + + pub(crate) fn upsert_delete_vector( + &mut self, + data_file_path: String, + delete_vector: DeleteVector, + ) { + let mut state = self.state.write().unwrap(); + + let Some(entry) = state.delete_vectors.get_mut(&data_file_path) else { + state + .delete_vectors + .insert(data_file_path, Arc::new(Mutex::new(delete_vector))); + return; + }; + + *entry.lock().unwrap() |= delete_vector; + } + + pub(crate) fn insert_equality_delete(&self, delete_file_path: String, eq_del: EqDelFuture) { + let mut state = self.state.write().unwrap(); + + state.equality_deletes.insert(delete_file_path, eq_del); + } +} + +pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { + matches!(f.file_type, DataContentType::EqualityDeletes) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::arrow::delete_file_loader::CachingDeleteFileLoader; + use crate::io::FileIO; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader fails with the expected FeatureUnsupportedError. + let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + let result = delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap(); + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } + + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 56caeaf55..c5c144853 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -23,7 +23,8 @@ pub use schema::*; mod nan_val_cnt_visitor; pub(crate) use nan_val_cnt_visitor::*; -pub(crate) mod delete_file_manager; +pub(crate) mod delete_file_loader; +pub(crate) mod delete_filter; mod reader; pub(crate) mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4ac993aee..9e1f4d33a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -41,7 +41,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::delete_file_manager::CachingDeleteFileManager; +use crate::arrow::delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -63,6 +63,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReaderBuilder { @@ -76,6 +77,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_support_enabled: false, } } @@ -104,18 +106,25 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable delete file support. + pub fn with_delete_file_support_enabled(mut self, delete_file_support_enabled: bool) -> Self { + self.delete_file_support_enabled = delete_file_support_enabled; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { batch_size: self.batch_size, file_io: self.file_io.clone(), - delete_file_manager: CachingDeleteFileManager::new( + delete_file_loader: CachingDeleteFileLoader::new( self.file_io.clone(), self.concurrency_limit_data_files, ), concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_support_enabled: self.delete_file_support_enabled, } } } @@ -125,13 +134,14 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, /// the maximum number of data files that can be fetched at the same time concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReader { @@ -143,6 +153,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let delete_file_support_enabled = self.delete_file_support_enabled; let stream = tasks .map_ok(move |task| { @@ -152,9 +163,10 @@ impl ArrowReader { task, batch_size, file_io, - self.delete_file_manager.clone(), + self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + delete_file_support_enabled, ) }) .map_err(|err| { @@ -166,26 +178,40 @@ impl ArrowReader { Ok(Box::pin(stream) as ArrowRecordBatchStream) } + #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, ) -> Result { + if !delete_file_support_enabled && !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete file support is not enabled", + )); + } + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - // concurrently retrieve delete files and create RecordBatchStreamBuilder - let (_, mut record_batch_stream_builder) = try_join!( - delete_file_manager.load_deletes(task.deletes.clone()), - Self::create_parquet_record_batch_stream_builder( - &task.data_file_path, - file_io.clone(), - should_load_page_index, - ) - )?; + let delete_filter_rx = delete_file_loader.load_deletes( + if delete_file_support_enabled { + &task.deletes + } else { + &[] + }, + task.schema.clone(), + ); + let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + ) + .await?; // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response @@ -207,7 +233,8 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; + let delete_filter = delete_filter_rx.await.unwrap()?; + let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; // In addition to the optional predicate supplied in the `FileScanTask`, // we also have an optional predicate resulting from equality delete files. @@ -275,15 +302,18 @@ impl ArrowReader { } } - let positional_delete_indexes = - delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path); + let positional_delete_indexes = delete_filter.get_delete_vector(&task); if let Some(positional_delete_indexes) = positional_delete_indexes { - let delete_row_selection = Self::build_deletes_row_selection( - record_batch_stream_builder.metadata().row_groups(), - &selected_row_group_indices, - positional_delete_indexes.as_ref(), - )?; + let delete_row_selection = { + let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); + + Self::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &selected_row_group_indices, + &positional_delete_indexes, + ) + }?; // merge the row selection from the delete files with the row selection // from the filter predicate, if there is one from the filter predicate @@ -318,7 +348,7 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - async fn create_parquet_record_batch_stream_builder( + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, should_load_page_index: bool, @@ -1716,7 +1746,7 @@ message schema { /* cases to cover: * {skip|select} {first|intermediate|last} {one row|multiple rows} in - {first|imtermediate|last} {skipped|selected} row group + {first|intermediate|last} {skipped|selected} row group * row group selection disabled */ diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index 57c15ffec..feb4eeea9 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::ops::BitOrAssign; + use roaring::bitmap::Iter; use roaring::treemap::BitmapIter; use roaring::RoaringTreemap; -#[allow(unused)] +#[derive(Debug, Default)] pub struct DeleteVector { inner: RoaringTreemap, } @@ -103,3 +105,9 @@ impl DeleteVectorIterator<'_> { inner.bitmap_iter.advance_to(lo); } } + +impl BitOrAssign for DeleteVector { + fn bitor_assign(&mut self, other: Self) { + self.inner.bitor_assign(&other.inner); + } +}