From a745174c82bb36e5c89e7e29f96288eb0be3c039 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 21 Feb 2025 08:10:06 +0000 Subject: [PATCH 1/5] feat: delete file manager loading --- .../iceberg/src/arrow/delete_file_manager.rs | 544 +++++++++++++++++- crates/iceberg/src/arrow/reader.rs | 50 +- crates/iceberg/src/delete_vector.rs | 8 +- 3 files changed, 567 insertions(+), 35 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index e1ca47679..388166cae 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -15,13 +15,23 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, OnceLock, RwLock}; +use std::task::{Context, Poll}; +use futures::channel::oneshot; +use futures::future::join_all; +use futures::{StreamExt, TryStreamExt}; + +use crate::arrow::ArrowReader; use crate::delete_vector::DeleteVector; -use crate::expr::BoundPredicate; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::SchemaRef; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile}; +use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; #[allow(unused)] @@ -37,6 +47,7 @@ pub trait DeleteFileManager { pub(crate) struct CachingDeleteFileManager { file_io: FileIO, concurrency_limit_data_files: usize, + state: Arc>, } impl DeleteFileManager for CachingDeleteFileManager { @@ -49,47 +60,532 @@ impl DeleteFileManager for CachingDeleteFileManager { )) } } +// Equality deletes may apply to more than one DataFile in a scan, and so +// the same equality delete file may be present in more than one invocation of +// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these +// to avoid having to load them twice, so we immediately store cloneable futures in the +// state that can be awaited upon to get te EQ deletes. That way we can check to see if +// a load of each Eq delete file is already in progress and avoid starting another one. +#[derive(Debug, Clone)] +struct EqDelFuture { + result: OnceLock, +} + +impl EqDelFuture { + pub fn new() -> (oneshot::Sender, Self) { + let (tx, rx) = oneshot::channel(); + let result = OnceLock::new(); + + crate::runtime::spawn({ + let result = result.clone(); + async move { result.set(rx.await.unwrap()) } + }); + + (tx, Self { result }) + } +} + +impl Future for EqDelFuture { + type Output = Predicate; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + match self.result.get() { + None => Poll::Pending, + Some(predicate) => Poll::Ready(predicate.clone()), + } + } +} + +#[derive(Debug, Default)] +struct DeleteFileManagerState { + // delete vectors and positional deletes get merged when loaded into a single delete vector + // per data file + delete_vectors: HashMap>>, + + // equality delete files are parsed into unbound `Predicate`s. We store them here as + // cloneable futures (see note below) + equality_deletes: HashMap, +} + +type StateRef = Arc>; + +// Intermediate context during processing of a delete file task. +enum DeleteFileContext { + // TODO: Delete Vector loader from Puffin files + InProgEqDel(EqDelFuture), + PosDels(ArrowRecordBatchStream), + FreshEqDel { + batch_stream: ArrowRecordBatchStream, + sender: oneshot::Sender, + }, +} + +// Final result of the processing of a delete file task before +// results are fully merged into the DeleteFileManager's state +enum ParsedDeleteFileContext { + InProgEqDel(EqDelFuture), + DelVecs(HashMap), + EqDel, +} #[allow(unused_variables)] impl CachingDeleteFileManager { - pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager { - Self { + pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + CachingDeleteFileManager { file_io, concurrency_limit_data_files, + state: Arc::new(Default::default()), } } pub(crate) async fn load_deletes( &self, - delete_file_entries: Vec, + delete_file_entries: &[FileScanTaskDeleteFile], ) -> Result<()> { - // TODO + /* + * Create a single stream of all delete file tasks irrespective of type, + so that we can respect the combined concurrency limit + * We then process each in two phases: load and parse. + * for positional deletes the load phase instantiates an ArrowRecordBatchStream to + stream the file contents out + * for eq deletes, we first check if the EQ delete is already loaded or being loaded by + another concurrently processing data file scan task. If it is, we return a future + for the pre-existing task from the load phase. If not, we create such a future + and store it in the state to prevent other data file tasks from starting to load + the same equality delete file, and return a record batch stream from the load phase + as per the other delete file types - only this time it is accompanied by a one-shot + channel sender that we will eventually use to resolve the shared future that we stored + in the state. + * When this gets updated to add support for delete vectors, the load phase will return + a PuffinReader for them. + * The parse phase parses each record batch stream according to its associated data type. + The result of this is a map of data file paths to delete vectors for the positional + delete tasks (and in future for the delete vector tasks). For equality delete + file tasks, this results in an unbound Predicate. + * The unbound Predicates resulting from equality deletes are sent to their associated oneshot + channel to store them in the right place in the delete file manager's state. + * The results of all of these futures are awaited on in parallel with the specified + level of concurrency and collected into a vec. We then combine all of the delete + vector maps that resulted from any positional delete or delete vector files into a + single map and persist it in the state. - if !delete_file_entries.is_empty() { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } else { - Ok(()) + + Conceptually, the data flow is like this: + + FileScanTaskDeleteFile + | + Already-loading EQ Delete | Everything Else + +---------------------------------------------------+ + | | + [get existing future] [load recordbatch stream / puffin] + DeleteFileContext::InProgEqDel DeleteFileContext + | | + | | + | +-----------------------------+--------------------------+ + | Pos Del Del Vec (Not yet Implemented) EQ Del + | | | | + | [parse pos del stream] [parse del vec puffin] [parse eq del] + | HashMap HashMap (Predicate, Sender) + | | | | + | | | [persist to state] + | | | () + | | | | + | +-----------------------------+--------------------------+ + | | + | [buffer unordered] + | | + | [combine del vectors] + | HashMap + | | + | [persist del vectors to state] + | () + | | + +-------------------------+-------------------------+ + | + [join!] + */ + + let stream_items = delete_file_entries + .iter() + .map(|t| (t.clone(), self.file_io.clone(), self.state.clone())) + .collect::>(); + // NOTE: removing the collect and just passing the iterator to futures::stream:iter + // results in an error 'implementation of `std::ops::FnOnce` is not general enough' + + let task_stream = futures::stream::iter(stream_items.into_iter()); + + let results: Vec = task_stream + .map(move |(task, file_io, state_ref)| async { + Self::load_file_for_task(task, file_io, state_ref).await + }) + .map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await })) + .try_buffer_unordered(self.concurrency_limit_data_files) + .try_collect::>() + .await?; + + // wait for all in-progress EQ deletes from other tasks + let _ = join_all(results.iter().filter_map(|i| { + if let ParsedDeleteFileContext::InProgEqDel(fut) = i { + Some(fut.clone()) + } else { + None + } + })) + .await; + + let merged_delete_vectors = results + .into_iter() + .fold(HashMap::default(), Self::merge_delete_vectors); + + self.state.write().unwrap().delete_vectors = merged_delete_vectors; + + Ok(()) + } + + async fn load_file_for_task( + task: FileScanTaskDeleteFile, + file_io: FileIO, + state: StateRef, + ) -> Result { + match task.file_type { + DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + )), + + DataContentType::EqualityDeletes => { + let (sender, fut) = EqDelFuture::new(); + { + let mut state = state.write().unwrap(); + + if let Some(existing) = state.equality_deletes.get(&task.file_path) { + return Ok(DeleteFileContext::InProgEqDel(existing.clone())); + } + + state + .equality_deletes + .insert(task.file_path.to_string(), fut); + } + + Ok(DeleteFileContext::FreshEqDel { + batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + sender, + }) + } + + DataContentType::Data => Err(Error::new( + ErrorKind::Unexpected, + "tasks with files of type Data not expected here", + )), + } + } + + async fn parse_file_content_for_task( + ctx: DeleteFileContext, + ) -> Result { + match ctx { + DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)), + DeleteFileContext::PosDels(batch_stream) => { + let del_vecs = + Self::parse_positional_deletes_record_batch_stream(batch_stream).await?; + Ok(ParsedDeleteFileContext::DelVecs(del_vecs)) + } + DeleteFileContext::FreshEqDel { + sender, + batch_stream, + } => { + let predicate = + Self::parse_equality_deletes_record_batch_stream(batch_stream).await?; + + sender + .send(predicate) + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Could not send eq delete predicate to state", + ) + }) + .map(|_| ParsedDeleteFileContext::EqDel) + } } } - pub(crate) fn build_delete_predicate( + fn merge_delete_vectors( + mut merged_delete_vectors: HashMap>>, + item: ParsedDeleteFileContext, + ) -> HashMap>> { + if let ParsedDeleteFileContext::DelVecs(del_vecs) = item { + del_vecs.into_iter().for_each(|(key, val)| { + let entry = merged_delete_vectors.entry(key).or_default(); + { + let mut inner = entry.write().unwrap(); + (*inner).intersect_assign(&val); + } + }); + } + + merged_delete_vectors + } + + /// Loads a RecordBatchStream for a given datafile. + async fn parquet_to_batch_stream( + data_file_path: &str, + file_io: FileIO, + ) -> Result { + /* + Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly + as that introduces a circular dependency. + */ + let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( + data_file_path, + file_io.clone(), + false, + ) + .await? + .build()? + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e))); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + + /// Parses a record batch stream coming from positional delete files + /// + /// Returns a map of data file path to a delete vector + async fn parse_positional_deletes_record_batch_stream( + stream: ArrowRecordBatchStream, + ) -> Result> { + // TODO + + Ok(HashMap::default()) + } + + /// Parses record batch streams from individual equality delete files + /// + /// Returns an unbound Predicate for each batch stream + async fn parse_equality_deletes_record_batch_stream( + streams: ArrowRecordBatchStream, + ) -> Result { + // TODO + + Ok(AlwaysTrue) + } + + /// Builds eq delete predicate for the provided task. + /// + /// Must await on load_deletes before calling this. + pub(crate) async fn build_delete_predicate_for_task( &self, - snapshot_schema: SchemaRef, + file_scan_task: &FileScanTask, ) -> Result> { - // TODO + // * Filter the task's deletes into just the Equality deletes + // * Retrieve the unbound predicate for each from self.state.equality_deletes + // * Logical-AND them all together to get a single combined `Predicate` + // * Bind the predicate to the task's schema to get a `BoundPredicate` + + let mut combined_predicate = AlwaysTrue; + for delete in &file_scan_task.deletes { + if !is_equality_delete(delete) { + continue; + } + + let predicate = { + let state = self.state.read().unwrap(); + + let Some(predicate) = state.equality_deletes.get(&delete.file_path) else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Missing predicate for equality delete file '{}'", + delete.file_path + ), + )); + }; - Ok(None) + predicate.clone() + }; + + combined_predicate = combined_predicate.and(predicate.await); + } + + if combined_predicate == AlwaysTrue { + return Ok(None); + } + + // TODO: handle case-insensitive case + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + Ok(Some(bound_predicate)) } - pub(crate) fn get_positional_delete_indexes_for_data_file( + /// Retrieve a delete vector for the data file associated with a given file scan task + /// + /// Should only be called after awaiting on load_deletes. Takes the vector to avoid a + /// clone since each item is specific to a single data file and won't need to be used again + pub(crate) fn get_delete_vector_for_task( &self, - data_file_path: &str, - ) -> Option> { - // TODO + file_scan_task: &FileScanTask, + ) -> Option>> { + self.state + .write() + .unwrap() + .delete_vectors + .get(file_scan_task.data_file_path()) + .map(Clone::clone) + } +} + +pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { + matches!(f.file_type, DataContentType::EqualityDeletes) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader does not fail. + let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes) + .await + .unwrap(); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } - None + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4ac993aee..fe0ef2fb8 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -63,6 +63,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReaderBuilder { @@ -76,6 +77,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_support_enabled: false, } } @@ -104,6 +106,12 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable delete file support. + pub fn with_delete_file_support_enabled(mut self, delete_file_support_enabled: bool) -> Self { + self.delete_file_support_enabled = delete_file_support_enabled; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { @@ -116,6 +124,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_support_enabled: self.delete_file_support_enabled, } } } @@ -132,6 +141,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReader { @@ -143,6 +153,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let delete_file_support_enabled = self.delete_file_support_enabled; let stream = tasks .map_ok(move |task| { @@ -155,6 +166,7 @@ impl ArrowReader { self.delete_file_manager.clone(), row_group_filtering_enabled, row_selection_enabled, + delete_file_support_enabled, ) }) .map_err(|err| { @@ -166,6 +178,7 @@ impl ArrowReader { Ok(Box::pin(stream) as ArrowRecordBatchStream) } + #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, @@ -173,13 +186,25 @@ impl ArrowReader { delete_file_manager: CachingDeleteFileManager, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, ) -> Result { + if !delete_file_support_enabled && !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete file support is not enabled", + )); + } + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); // concurrently retrieve delete files and create RecordBatchStreamBuilder let (_, mut record_batch_stream_builder) = try_join!( - delete_file_manager.load_deletes(task.deletes.clone()), + delete_file_manager.load_deletes(if delete_file_support_enabled { + &task.deletes + } else { + &[] + },), Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), @@ -207,7 +232,9 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; + let delete_predicate = delete_file_manager + .build_delete_predicate_for_task(&task) + .await?; // In addition to the optional predicate supplied in the `FileScanTask`, // we also have an optional predicate resulting from equality delete files. @@ -275,15 +302,18 @@ impl ArrowReader { } } - let positional_delete_indexes = - delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path); + let positional_delete_indexes = delete_file_manager.get_delete_vector_for_task(&task); if let Some(positional_delete_indexes) = positional_delete_indexes { - let delete_row_selection = Self::build_deletes_row_selection( - record_batch_stream_builder.metadata().row_groups(), - &selected_row_group_indices, - positional_delete_indexes.as_ref(), - )?; + let delete_row_selection = { + let positional_delete_indexes = positional_delete_indexes.read().unwrap(); + + Self::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &selected_row_group_indices, + &positional_delete_indexes, + )? + }; // merge the row selection from the delete files with the row selection // from the filter predicate, if there is one from the filter predicate @@ -318,7 +348,7 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - async fn create_parquet_record_batch_stream_builder( + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, should_load_page_index: bool, diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index 57c15ffec..1dba780e1 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::ops::BitOrAssign; + use roaring::bitmap::Iter; use roaring::treemap::BitmapIter; use roaring::RoaringTreemap; -#[allow(unused)] +#[derive(Debug, Default)] pub struct DeleteVector { inner: RoaringTreemap, } @@ -36,6 +38,10 @@ impl DeleteVector { let outer = self.inner.bitmaps(); DeleteVectorIterator { outer, inner: None } } + + pub fn intersect_assign(&mut self, other: &DeleteVector) { + self.inner.bitor_assign(&other.inner); + } } // Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here. From 8dcc004fb516c06e48de1d205fcd2f241806c4a1 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 28 Mar 2025 08:05:02 +0000 Subject: [PATCH 2/5] feat: changes suggested in review --- .../iceberg/src/arrow/delete_file_manager.rs | 138 +++++++++--------- 1 file changed, 71 insertions(+), 67 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 388166cae..9b1472eb4 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -138,73 +138,75 @@ impl CachingDeleteFileManager { } } + /// Load the deletes for all the specified tasks + /// + /// Returned future completes once all loading has finished. + /// + /// * Create a single stream of all delete file tasks irrespective of type, + /// so that we can respect the combined concurrency limit + /// * We then process each in two phases: load and parse. + /// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to + /// stream the file contents out + /// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by + /// another concurrently processing data file scan task. If it is, we return a future + /// for the pre-existing task from the load phase. If not, we create such a future + /// and store it in the state to prevent other data file tasks from starting to load + /// the same equality delete file, and return a record batch stream from the load phase + /// as per the other delete file types - only this time it is accompanied by a one-shot + /// channel sender that we will eventually use to resolve the shared future that we stored + /// in the state. + /// * When this gets updated to add support for delete vectors, the load phase will return + /// a PuffinReader for them. + /// * The parse phase parses each record batch stream according to its associated data type. + /// The result of this is a map of data file paths to delete vectors for the positional + /// delete tasks (and in future for the delete vector tasks). For equality delete + /// file tasks, this results in an unbound Predicate. + /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot + /// channel to store them in the right place in the delete file managers state. + /// * The results of all of these futures are awaited on in parallel with the specified + /// level of concurrency and collected into a vec. We then combine all of the delete + /// vector maps that resulted from any positional delete or delete vector files into a + /// single map and persist it in the state. + /// + /// + /// Conceptually, the data flow is like this: + /// ```none + /// FileScanTaskDeleteFile + /// | + /// Already-loading EQ Delete | Everything Else + /// +---------------------------------------------------+ + /// | | + /// [get existing future] [load recordbatch stream / puffin] + /// DeleteFileContext::InProgEqDel DeleteFileContext + /// | | + /// | | + /// | +-----------------------------+--------------------------+ + /// | Pos Del Del Vec (Not yet Implemented) EQ Del + /// | | | | + /// | [parse pos del stream] [parse del vec puffin] [parse eq del] + /// | HashMap HashMap (Predicate, Sender) + /// | | | | + /// | | | [persist to state] + /// | | | () + /// | | | | + /// | +-----------------------------+--------------------------+ + /// | | + /// | [buffer unordered] + /// | | + /// | [combine del vectors] + /// | HashMap + /// | | + /// | [persist del vectors to state] + /// | () + /// | | + /// +-------------------------+-------------------------+ + /// | + /// [join!] + /// ``` pub(crate) async fn load_deletes( &self, delete_file_entries: &[FileScanTaskDeleteFile], ) -> Result<()> { - /* - * Create a single stream of all delete file tasks irrespective of type, - so that we can respect the combined concurrency limit - * We then process each in two phases: load and parse. - * for positional deletes the load phase instantiates an ArrowRecordBatchStream to - stream the file contents out - * for eq deletes, we first check if the EQ delete is already loaded or being loaded by - another concurrently processing data file scan task. If it is, we return a future - for the pre-existing task from the load phase. If not, we create such a future - and store it in the state to prevent other data file tasks from starting to load - the same equality delete file, and return a record batch stream from the load phase - as per the other delete file types - only this time it is accompanied by a one-shot - channel sender that we will eventually use to resolve the shared future that we stored - in the state. - * When this gets updated to add support for delete vectors, the load phase will return - a PuffinReader for them. - * The parse phase parses each record batch stream according to its associated data type. - The result of this is a map of data file paths to delete vectors for the positional - delete tasks (and in future for the delete vector tasks). For equality delete - file tasks, this results in an unbound Predicate. - * The unbound Predicates resulting from equality deletes are sent to their associated oneshot - channel to store them in the right place in the delete file manager's state. - * The results of all of these futures are awaited on in parallel with the specified - level of concurrency and collected into a vec. We then combine all of the delete - vector maps that resulted from any positional delete or delete vector files into a - single map and persist it in the state. - - - Conceptually, the data flow is like this: - - FileScanTaskDeleteFile - | - Already-loading EQ Delete | Everything Else - +---------------------------------------------------+ - | | - [get existing future] [load recordbatch stream / puffin] - DeleteFileContext::InProgEqDel DeleteFileContext - | | - | | - | +-----------------------------+--------------------------+ - | Pos Del Del Vec (Not yet Implemented) EQ Del - | | | | - | [parse pos del stream] [parse del vec puffin] [parse eq del] - | HashMap HashMap (Predicate, Sender) - | | | | - | | | [persist to state] - | | | () - | | | | - | +-----------------------------+--------------------------+ - | | - | [buffer unordered] - | | - | [combine del vectors] - | HashMap - | | - | [persist del vectors to state] - | () - | | - +-------------------------+-------------------------+ - | - [join!] - */ - let stream_items = delete_file_entries .iter() .map(|t| (t.clone(), self.file_io.clone(), self.state.clone())) @@ -253,18 +255,20 @@ impl CachingDeleteFileManager { )), DataContentType::EqualityDeletes => { - let (sender, fut) = EqDelFuture::new(); - { + let sender = { let mut state = state.write().unwrap(); - if let Some(existing) = state.equality_deletes.get(&task.file_path) { return Ok(DeleteFileContext::InProgEqDel(existing.clone())); } + let (sender, fut) = EqDelFuture::new(); + state .equality_deletes .insert(task.file_path.to_string(), fut); - } + + sender + }; Ok(DeleteFileContext::FreshEqDel { batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?, From fff72ef22faf8b9dd4ee382ccf7f1d83dfbfebdd Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 17 Apr 2025 07:35:03 +0100 Subject: [PATCH 3/5] feat: return Err for unimplemented delete vec parse methods and make DeleteVector::intersect_assign pub(crate) --- .../iceberg/src/arrow/delete_file_manager.rs | 19 +++++++++++++------ crates/iceberg/src/arrow/reader.rs | 2 +- crates/iceberg/src/delete_vector.rs | 2 +- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 9b1472eb4..7eb84aff7 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -359,7 +359,10 @@ impl CachingDeleteFileManager { ) -> Result> { // TODO - Ok(HashMap::default()) + Err(Error::new( + ErrorKind::FeatureUnsupported, + "parsing of positional deletes is not yet supported", + )) } /// Parses record batch streams from individual equality delete files @@ -370,7 +373,10 @@ impl CachingDeleteFileManager { ) -> Result { // TODO - Ok(AlwaysTrue) + Err(Error::new( + ErrorKind::FeatureUnsupported, + "parsing of equality deletes is not yet supported", + )) } /// Builds eq delete predicate for the provided task. @@ -471,15 +477,16 @@ mod tests { .unwrap(); // Note that with the delete file parsing not yet in place, all we can test here is that - // the call to the loader does not fail. + // the call to the loader fails with the expected FeatureUnsupportedError. let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10); let file_scan_tasks = setup(table_location); - delete_file_manager + let result = delete_file_manager .load_deletes(&file_scan_tasks[0].deletes) - .await - .unwrap(); + .await; + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); } fn setup(table_location: &Path) -> Vec { diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index fe0ef2fb8..064419c85 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -307,7 +307,7 @@ impl ArrowReader { if let Some(positional_delete_indexes) = positional_delete_indexes { let delete_row_selection = { let positional_delete_indexes = positional_delete_indexes.read().unwrap(); - + Self::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index 1dba780e1..fa8ef6934 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -39,7 +39,7 @@ impl DeleteVector { DeleteVectorIterator { outer, inner: None } } - pub fn intersect_assign(&mut self, other: &DeleteVector) { + pub(crate) fn intersect_assign(&mut self, other: &DeleteVector) { self.inner.bitor_assign(&other.inner); } } From 5f6036ddbd8a83d6ee71acf6229b2a3122fad1d4 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 23 Apr 2025 21:48:14 +0100 Subject: [PATCH 4/5] feat: schema evolution of equality delete file record batches --- .../iceberg/src/arrow/delete_file_manager.rs | 52 ++++++++++++++++--- crates/iceberg/src/arrow/reader.rs | 13 +++-- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 7eb84aff7..96034d917 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -25,13 +25,14 @@ use futures::channel::oneshot; use futures::future::join_all; use futures::{StreamExt, TryStreamExt}; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::ArrowReader; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile}; -use crate::spec::DataContentType; +use crate::spec::{DataContentType, Schema, SchemaRef}; use crate::{Error, ErrorKind, Result}; #[allow(unused)] @@ -164,7 +165,7 @@ impl CachingDeleteFileManager { /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot /// channel to store them in the right place in the delete file managers state. /// * The results of all of these futures are awaited on in parallel with the specified - /// level of concurrency and collected into a vec. We then combine all of the delete + /// level of concurrency and collected into a vec. We then combine all the delete /// vector maps that resulted from any positional delete or delete vector files into a /// single map and persist it in the state. /// @@ -206,10 +207,18 @@ impl CachingDeleteFileManager { pub(crate) async fn load_deletes( &self, delete_file_entries: &[FileScanTaskDeleteFile], + schema: SchemaRef, ) -> Result<()> { let stream_items = delete_file_entries .iter() - .map(|t| (t.clone(), self.file_io.clone(), self.state.clone())) + .map(|t| { + ( + t.clone(), + self.file_io.clone(), + self.state.clone(), + schema.clone(), + ) + }) .collect::>(); // NOTE: removing the collect and just passing the iterator to futures::stream:iter // results in an error 'implementation of `std::ops::FnOnce` is not general enough' @@ -217,8 +226,8 @@ impl CachingDeleteFileManager { let task_stream = futures::stream::iter(stream_items.into_iter()); let results: Vec = task_stream - .map(move |(task, file_io, state_ref)| async { - Self::load_file_for_task(task, file_io, state_ref).await + .map(move |(task, file_io, state_ref, schema)| async { + Self::load_file_for_task(task, file_io, state_ref, schema).await }) .map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await })) .try_buffer_unordered(self.concurrency_limit_data_files) @@ -248,6 +257,7 @@ impl CachingDeleteFileManager { task: FileScanTaskDeleteFile, file_io: FileIO, state: StateRef, + schema: SchemaRef, ) -> Result { match task.file_type { DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( @@ -271,7 +281,11 @@ impl CachingDeleteFileManager { }; Ok(DeleteFileContext::FreshEqDel { - batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + batch_stream: Self::evolve_schema( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + schema, + ) + .await?, sender, }) } @@ -351,6 +365,30 @@ impl CachingDeleteFileManager { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + /// Evolves the schema of the RecordBatches from an equality delete file + async fn evolve_schema( + record_batch_stream: ArrowRecordBatchStream, + target_schema: Arc, + ) -> Result { + let eq_ids = target_schema + .as_ref() + .field_id_to_name_map() + .keys() + .cloned() + .collect::>(); + + let mut record_batch_transformer = + RecordBatchTransformer::build(target_schema.clone(), &eq_ids); + + let record_batch_stream = record_batch_stream.map(move |record_batch| { + record_batch.and_then(|record_batch| { + record_batch_transformer.process_record_batch(record_batch) + }) + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } + /// Parses a record batch stream coming from positional delete files /// /// Returns a map of data file path to a delete vector @@ -483,7 +521,7 @@ mod tests { let file_scan_tasks = setup(table_location); let result = delete_file_manager - .load_deletes(&file_scan_tasks[0].deletes) + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await; assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 064419c85..020f89f1e 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -200,11 +200,14 @@ impl ArrowReader { // concurrently retrieve delete files and create RecordBatchStreamBuilder let (_, mut record_batch_stream_builder) = try_join!( - delete_file_manager.load_deletes(if delete_file_support_enabled { - &task.deletes - } else { - &[] - },), + delete_file_manager.load_deletes( + if delete_file_support_enabled { + &task.deletes + } else { + &[] + }, + task.schema.clone() + ), Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), From 3804bda465135c305c1142c137b69fb7993a98b1 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 30 Apr 2025 20:07:35 +0100 Subject: [PATCH 5/5] refactor: split DeleteFileManager into DeleteFileLoader and DeleteFilter --- ..._file_manager.rs => delete_file_loader.rs} | 276 +++++--------- crates/iceberg/src/arrow/delete_filter.rs | 337 ++++++++++++++++++ crates/iceberg/src/arrow/mod.rs | 3 +- crates/iceberg/src/arrow/reader.rs | 55 ++- crates/iceberg/src/delete_vector.rs | 10 +- 5 files changed, 454 insertions(+), 227 deletions(-) rename crates/iceberg/src/arrow/{delete_file_manager.rs => delete_file_loader.rs} (70%) create mode 100644 crates/iceberg/src/arrow/delete_filter.rs diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_loader.rs similarity index 70% rename from crates/iceberg/src/arrow/delete_file_manager.rs rename to crates/iceberg/src/arrow/delete_file_loader.rs index 96034d917..663b7e77c 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -16,100 +16,57 @@ // under the License. use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, OnceLock, RwLock}; -use std::task::{Context, Poll}; +use std::sync::Arc; use futures::channel::oneshot; use futures::future::join_all; use futures::{StreamExt, TryStreamExt}; +use tokio::sync::oneshot::{channel, Receiver}; +use super::delete_filter::{DeleteFilter, EqDelFuture}; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::ArrowReader; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::expr::Predicate; use crate::io::FileIO; -use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile}; +use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, Schema, SchemaRef}; use crate::{Error, ErrorKind, Result}; #[allow(unused)] -pub trait DeleteFileManager { +pub trait DeleteFileLoader { /// Read the delete file referred to in the task /// - /// Returns the raw contents of the delete file as a RecordBatch stream - fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result; + /// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution. + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result; } #[allow(unused)] #[derive(Clone, Debug)] -pub(crate) struct CachingDeleteFileManager { +pub(crate) struct CachingDeleteFileLoader { file_io: FileIO, concurrency_limit_data_files: usize, - state: Arc>, -} - -impl DeleteFileManager for CachingDeleteFileManager { - fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result { - // TODO, implementation in https://github.com/apache/iceberg-rust/pull/982 - - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } -} -// Equality deletes may apply to more than one DataFile in a scan, and so -// the same equality delete file may be present in more than one invocation of -// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these -// to avoid having to load them twice, so we immediately store cloneable futures in the -// state that can be awaited upon to get te EQ deletes. That way we can check to see if -// a load of each Eq delete file is already in progress and avoid starting another one. -#[derive(Debug, Clone)] -struct EqDelFuture { - result: OnceLock, -} - -impl EqDelFuture { - pub fn new() -> (oneshot::Sender, Self) { - let (tx, rx) = oneshot::channel(); - let result = OnceLock::new(); - - crate::runtime::spawn({ - let result = result.clone(); - async move { result.set(rx.await.unwrap()) } - }); - - (tx, Self { result }) - } + del_filter: DeleteFilter, } -impl Future for EqDelFuture { - type Output = Predicate; +impl DeleteFileLoader for CachingDeleteFileLoader { + async fn read_delete_file( + &self, + task: &FileScanTaskDeleteFile, + schema: SchemaRef, + ) -> Result { + let raw_batch_stream = + CachingDeleteFileLoader::parquet_to_batch_stream(&task.file_path, self.file_io.clone()) + .await?; - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - match self.result.get() { - None => Poll::Pending, - Some(predicate) => Poll::Ready(predicate.clone()), - } + Self::evolve_schema(raw_batch_stream, schema).await } } -#[derive(Debug, Default)] -struct DeleteFileManagerState { - // delete vectors and positional deletes get merged when loaded into a single delete vector - // per data file - delete_vectors: HashMap>>, - - // equality delete files are parsed into unbound `Predicate`s. We store them here as - // cloneable futures (see note below) - equality_deletes: HashMap, -} - -type StateRef = Arc>; - // Intermediate context during processing of a delete file task. enum DeleteFileContext { // TODO: Delete Vector loader from Puffin files @@ -130,12 +87,12 @@ enum ParsedDeleteFileContext { } #[allow(unused_variables)] -impl CachingDeleteFileManager { +impl CachingDeleteFileLoader { pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { - CachingDeleteFileManager { + CachingDeleteFileLoader { file_io, concurrency_limit_data_files, - state: Arc::new(Default::default()), + del_filter: DeleteFilter::default(), } } @@ -169,7 +126,7 @@ impl CachingDeleteFileManager { /// vector maps that resulted from any positional delete or delete vector files into a /// single map and persist it in the state. /// - /// + /// /// Conceptually, the data flow is like this: /// ```none /// FileScanTaskDeleteFile @@ -204,59 +161,74 @@ impl CachingDeleteFileManager { /// | /// [join!] /// ``` - pub(crate) async fn load_deletes( + pub(crate) fn load_deletes( &self, delete_file_entries: &[FileScanTaskDeleteFile], schema: SchemaRef, - ) -> Result<()> { + ) -> Receiver> { + let (tx, rx) = channel(); + let stream_items = delete_file_entries .iter() .map(|t| { ( t.clone(), self.file_io.clone(), - self.state.clone(), + self.del_filter.clone(), schema.clone(), ) }) .collect::>(); - // NOTE: removing the collect and just passing the iterator to futures::stream:iter - // results in an error 'implementation of `std::ops::FnOnce` is not general enough' - - let task_stream = futures::stream::iter(stream_items.into_iter()); + let task_stream = futures::stream::iter(stream_items); + let del_filter = self.del_filter.clone(); + let concurrency_limit_data_files = self.concurrency_limit_data_files; + crate::runtime::spawn(async move { + let result = async move { + let mut del_filter = del_filter; + + let results: Vec = task_stream + .map(move |(task, file_io, del_filter, schema)| async move { + Self::load_file_for_task(&task, file_io, del_filter, schema).await + }) + .map(move |ctx| { + Ok(async { Self::parse_file_content_for_task(ctx.await?).await }) + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_collect::>() + .await?; + + // wait for all in-progress EQ deletes from other tasks + let _ = join_all(results.iter().filter_map(|i| { + if let ParsedDeleteFileContext::InProgEqDel(fut) = i { + Some(fut.clone()) + } else { + None + } + })) + .await; + + for item in results { + if let ParsedDeleteFileContext::DelVecs(hash_map) = item { + for (data_file_path, delete_vector) in hash_map.into_iter() { + del_filter.upsert_delete_vector(data_file_path, delete_vector); + } + } + } - let results: Vec = task_stream - .map(move |(task, file_io, state_ref, schema)| async { - Self::load_file_for_task(task, file_io, state_ref, schema).await - }) - .map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await })) - .try_buffer_unordered(self.concurrency_limit_data_files) - .try_collect::>() - .await?; - - // wait for all in-progress EQ deletes from other tasks - let _ = join_all(results.iter().filter_map(|i| { - if let ParsedDeleteFileContext::InProgEqDel(fut) = i { - Some(fut.clone()) - } else { - None + Ok(del_filter) } - })) - .await; - - let merged_delete_vectors = results - .into_iter() - .fold(HashMap::default(), Self::merge_delete_vectors); + .await; - self.state.write().unwrap().delete_vectors = merged_delete_vectors; + let _ = tx.send(result); + }); - Ok(()) + rx } async fn load_file_for_task( - task: FileScanTaskDeleteFile, + task: &FileScanTaskDeleteFile, file_io: FileIO, - state: StateRef, + del_filter: DeleteFilter, schema: SchemaRef, ) -> Result { match task.file_type { @@ -266,16 +238,15 @@ impl CachingDeleteFileManager { DataContentType::EqualityDeletes => { let sender = { - let mut state = state.write().unwrap(); - if let Some(existing) = state.equality_deletes.get(&task.file_path) { + if let Some(existing) = del_filter + .get_equality_delete_predicate_for_delete_file_path(&task.file_path) + { return Ok(DeleteFileContext::InProgEqDel(existing.clone())); } let (sender, fut) = EqDelFuture::new(); - state - .equality_deletes - .insert(task.file_path.to_string(), fut); + del_filter.insert_equality_delete(task.file_path.to_string(), fut); sender }; @@ -327,23 +298,6 @@ impl CachingDeleteFileManager { } } - fn merge_delete_vectors( - mut merged_delete_vectors: HashMap>>, - item: ParsedDeleteFileContext, - ) -> HashMap>> { - if let ParsedDeleteFileContext::DelVecs(del_vecs) = item { - del_vecs.into_iter().for_each(|(key, val)| { - let entry = merged_delete_vectors.entry(key).or_default(); - { - let mut inner = entry.write().unwrap(); - (*inner).intersect_assign(&val); - } - }); - } - - merged_delete_vectors - } - /// Loads a RecordBatchStream for a given datafile. async fn parquet_to_batch_stream( data_file_path: &str, @@ -416,72 +370,6 @@ impl CachingDeleteFileManager { "parsing of equality deletes is not yet supported", )) } - - /// Builds eq delete predicate for the provided task. - /// - /// Must await on load_deletes before calling this. - pub(crate) async fn build_delete_predicate_for_task( - &self, - file_scan_task: &FileScanTask, - ) -> Result> { - // * Filter the task's deletes into just the Equality deletes - // * Retrieve the unbound predicate for each from self.state.equality_deletes - // * Logical-AND them all together to get a single combined `Predicate` - // * Bind the predicate to the task's schema to get a `BoundPredicate` - - let mut combined_predicate = AlwaysTrue; - for delete in &file_scan_task.deletes { - if !is_equality_delete(delete) { - continue; - } - - let predicate = { - let state = self.state.read().unwrap(); - - let Some(predicate) = state.equality_deletes.get(&delete.file_path) else { - return Err(Error::new( - ErrorKind::Unexpected, - format!( - "Missing predicate for equality delete file '{}'", - delete.file_path - ), - )); - }; - - predicate.clone() - }; - - combined_predicate = combined_predicate.and(predicate.await); - } - - if combined_predicate == AlwaysTrue { - return Ok(None); - } - - // TODO: handle case-insensitive case - let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; - Ok(Some(bound_predicate)) - } - - /// Retrieve a delete vector for the data file associated with a given file scan task - /// - /// Should only be called after awaiting on load_deletes. Takes the vector to avoid a - /// clone since each item is specific to a single data file and won't need to be used again - pub(crate) fn get_delete_vector_for_task( - &self, - file_scan_task: &FileScanTask, - ) -> Option>> { - self.state - .write() - .unwrap() - .delete_vectors - .get(file_scan_task.data_file_path()) - .map(Clone::clone) - } -} - -pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { - matches!(f.file_type, DataContentType::EqualityDeletes) } #[cfg(test)] @@ -498,6 +386,7 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::scan::FileScanTask; use crate::spec::{DataFileFormat, Schema}; type ArrowSchemaRef = Arc; @@ -516,13 +405,14 @@ mod tests { // Note that with the delete file parsing not yet in place, all we can test here is that // the call to the loader fails with the expected FeatureUnsupportedError. - let delete_file_manager = CachingDeleteFileManager::new(file_io.clone(), 10); + let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); let file_scan_tasks = setup(table_location); let result = delete_file_manager .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) - .await; + .await + .unwrap(); assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs new file mode 100644 index 000000000..aee69c3cc --- /dev/null +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex, OnceLock, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::oneshot; + +use crate::delete_vector::DeleteVector; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; +use crate::spec::DataContentType; +use crate::{Error, ErrorKind, Result}; + +// Equality deletes may apply to more than one DataFile in a scan, and so +// the same equality delete file may be present in more than one invocation of +// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these +// to avoid having to load them twice, so we immediately store cloneable futures in the +// state that can be awaited upon to get te EQ deletes. That way we can check to see if +// a load of each Eq delete file is already in progress and avoid starting another one. +#[derive(Debug, Clone)] +pub(crate) struct EqDelFuture { + result: OnceLock, +} + +impl EqDelFuture { + pub(crate) fn new() -> (oneshot::Sender, Self) { + let (tx, rx) = oneshot::channel(); + let result = OnceLock::new(); + + crate::runtime::spawn({ + let result = result.clone(); + async move { result.set(rx.await.unwrap()) } + }); + + (tx, Self { result }) + } +} + +impl Future for EqDelFuture { + type Output = Predicate; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + match self.result.get() { + None => Poll::Pending, + Some(predicate) => Poll::Ready(predicate.clone()), + } + } +} + +#[derive(Debug, Default)] +struct DeleteFileFilterState { + delete_vectors: HashMap>>, + equality_deletes: HashMap, +} + +#[derive(Clone, Debug, Default)] +pub struct DeleteFilter { + state: Arc>, +} + +impl DeleteFilter { + /// Retrieve a delete vector for the data file associated with a given file scan task + pub fn get_delete_vector( + &self, + file_scan_task: &FileScanTask, + ) -> Option>> { + self.get_delete_vector_for_path(file_scan_task.data_file_path()) + } + + /// Retrieve a delete vector for a data file + pub fn get_delete_vector_for_path( + &self, + delete_file_path: &str, + ) -> Option>> { + self.state + .read() + .ok() + .and_then(|st| st.delete_vectors.get(delete_file_path).cloned()) + } + + /// Retrieve the equality delete predicate for a given eq delete file path + pub(crate) fn get_equality_delete_predicate_for_delete_file_path( + &self, + file_path: &str, + ) -> Option { + self.state + .read() + .unwrap() + .equality_deletes + .get(file_path) + .cloned() + } + + /// Builds eq delete predicate for the provided task. + pub async fn build_equality_delete_predicate( + &self, + file_scan_task: &FileScanTask, + ) -> Result> { + // * Filter the task's deletes into just the Equality deletes + // * Retrieve the unbound predicate for each from self.state.equality_deletes + // * Logical-AND them all together to get a single combined `Predicate` + // * Bind the predicate to the task's schema to get a `BoundPredicate` + + let mut combined_predicate = AlwaysTrue; + for delete in &file_scan_task.deletes { + if !is_equality_delete(delete) { + continue; + } + + let Some(predicate) = + self.get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Missing predicate for equality delete file '{}'", + delete.file_path + ), + )); + }; + + combined_predicate = combined_predicate.and(predicate.await); + } + + if combined_predicate == AlwaysTrue { + return Ok(None); + } + + // TODO: handle case-insensitive case + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + Ok(Some(bound_predicate)) + } + + pub(crate) fn upsert_delete_vector( + &mut self, + data_file_path: String, + delete_vector: DeleteVector, + ) { + let mut state = self.state.write().unwrap(); + + let Some(entry) = state.delete_vectors.get_mut(&data_file_path) else { + state + .delete_vectors + .insert(data_file_path, Arc::new(Mutex::new(delete_vector))); + return; + }; + + *entry.lock().unwrap() |= delete_vector; + } + + pub(crate) fn insert_equality_delete(&self, delete_file_path: String, eq_del: EqDelFuture) { + let mut state = self.state.write().unwrap(); + + state.equality_deletes.insert(delete_file_path, eq_del); + } +} + +pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { + matches!(f.file_type, DataContentType::EqualityDeletes) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::arrow::delete_file_loader::CachingDeleteFileLoader; + use crate::io::FileIO; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader fails with the expected FeatureUnsupportedError. + let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + let result = delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap(); + + assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: vec![], + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } + + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 56caeaf55..c5c144853 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -23,7 +23,8 @@ pub use schema::*; mod nan_val_cnt_visitor; pub(crate) use nan_val_cnt_visitor::*; -pub(crate) mod delete_file_manager; +pub(crate) mod delete_file_loader; +pub(crate) mod delete_filter; mod reader; pub(crate) mod record_batch_projector; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 020f89f1e..9e1f4d33a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -41,7 +41,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::delete_file_manager::CachingDeleteFileManager; +use crate::arrow::delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -117,7 +117,7 @@ impl ArrowReaderBuilder { ArrowReader { batch_size: self.batch_size, file_io: self.file_io.clone(), - delete_file_manager: CachingDeleteFileManager::new( + delete_file_loader: CachingDeleteFileLoader::new( self.file_io.clone(), self.concurrency_limit_data_files, ), @@ -134,7 +134,7 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, /// the maximum number of data files that can be fetched at the same time concurrency_limit_data_files: usize, @@ -163,7 +163,7 @@ impl ArrowReader { task, batch_size, file_io, - self.delete_file_manager.clone(), + self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, delete_file_support_enabled, @@ -183,7 +183,7 @@ impl ArrowReader { task: FileScanTask, batch_size: Option, file_io: FileIO, - delete_file_manager: CachingDeleteFileManager, + delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, delete_file_support_enabled: bool, @@ -198,22 +198,20 @@ impl ArrowReader { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - // concurrently retrieve delete files and create RecordBatchStreamBuilder - let (_, mut record_batch_stream_builder) = try_join!( - delete_file_manager.load_deletes( - if delete_file_support_enabled { - &task.deletes - } else { - &[] - }, - task.schema.clone() - ), - Self::create_parquet_record_batch_stream_builder( - &task.data_file_path, - file_io.clone(), - should_load_page_index, - ) - )?; + let delete_filter_rx = delete_file_loader.load_deletes( + if delete_file_support_enabled { + &task.deletes + } else { + &[] + }, + task.schema.clone(), + ); + let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + ) + .await?; // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response @@ -235,9 +233,8 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_predicate = delete_file_manager - .build_delete_predicate_for_task(&task) - .await?; + let delete_filter = delete_filter_rx.await.unwrap()?; + let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; // In addition to the optional predicate supplied in the `FileScanTask`, // we also have an optional predicate resulting from equality delete files. @@ -305,18 +302,18 @@ impl ArrowReader { } } - let positional_delete_indexes = delete_file_manager.get_delete_vector_for_task(&task); + let positional_delete_indexes = delete_filter.get_delete_vector(&task); if let Some(positional_delete_indexes) = positional_delete_indexes { let delete_row_selection = { - let positional_delete_indexes = positional_delete_indexes.read().unwrap(); + let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); Self::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, &positional_delete_indexes, - )? - }; + ) + }?; // merge the row selection from the delete files with the row selection // from the filter predicate, if there is one from the filter predicate @@ -1749,7 +1746,7 @@ message schema { /* cases to cover: * {skip|select} {first|intermediate|last} {one row|multiple rows} in - {first|imtermediate|last} {skipped|selected} row group + {first|intermediate|last} {skipped|selected} row group * row group selection disabled */ diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index fa8ef6934..feb4eeea9 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -38,10 +38,6 @@ impl DeleteVector { let outer = self.inner.bitmaps(); DeleteVectorIterator { outer, inner: None } } - - pub(crate) fn intersect_assign(&mut self, other: &DeleteVector) { - self.inner.bitor_assign(&other.inner); - } } // Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here. @@ -109,3 +105,9 @@ impl DeleteVectorIterator<'_> { inner.bitmap_iter.advance_to(lo); } } + +impl BitOrAssign for DeleteVector { + fn bitor_assign(&mut self, other: Self) { + self.inner.bitor_assign(&other.inner); + } +}