Skip to content

Scan Delete Support Part 4: Delete File Loading; Skeleton for Processing #982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

324 changes: 324 additions & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use futures::{StreamExt, TryStreamExt};
use tokio::sync::oneshot::{Receiver, channel};

use super::delete_filter::DeleteFilter;
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
use crate::delete_vector::DeleteVector;
use crate::expr::Predicate;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, SchemaRef};
use crate::{Error, ErrorKind, Result};

#[derive(Clone, Debug)]
pub(crate) struct CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader,
concurrency_limit_data_files: usize,
}

// Intermediate context during processing of a delete file task.
enum DeleteFileContext {
// TODO: Delete Vector loader from Puffin files
ExistingEqDel,
PosDels(ArrowRecordBatchStream),
FreshEqDel {
batch_stream: ArrowRecordBatchStream,
sender: tokio::sync::oneshot::Sender<Predicate>,
},
}

// Final result of the processing of a delete file task before
// results are fully merged into the DeleteFileManager's state
enum ParsedDeleteFileContext {
DelVecs(HashMap<String, DeleteVector>),
EqDel,
}

#[allow(unused_variables)]
impl CachingDeleteFileLoader {
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self {
CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
concurrency_limit_data_files,
}
}

/// Initiates loading of all deletes for all the specified tasks
///
/// Returned future completes once all positional deletes and delete vectors
/// have loaded. EQ deletes are not waited for in this method but the returned
/// DeleteFilter will await their loading when queried for them.
///
/// * Create a single stream of all delete file tasks irrespective of type,
/// so that we can respect the combined concurrency limit
/// * We then process each in two phases: load and parse.
/// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
/// stream the file contents out
/// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
/// another concurrently processing data file scan task. If it is, we skip it.
/// If not, the DeleteFilter is updated to contain a notifier to prevent other data file
/// tasks from starting to load the same equality delete file. We spawn a task to load
/// the EQ delete's record batch stream, convert it to a predicate, update the delete filter,
/// and notify any task that was waiting for it.
/// * When this gets updated to add support for delete vectors, the load phase will return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to the puffin / deletion vector support!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me too! 😁

/// a PuffinReader for them.
/// * The parse phase parses each record batch stream according to its associated data type.
/// The result of this is a map of data file paths to delete vectors for the positional
/// delete tasks (and in future for the delete vector tasks). For equality delete
/// file tasks, this results in an unbound Predicate.
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
/// channel to store them in the right place in the delete file managers state.
/// * The results of all of these futures are awaited on in parallel with the specified
/// level of concurrency and collected into a vec. We then combine all the delete
/// vector maps that resulted from any positional delete or delete vector files into a
/// single map and persist it in the state.
///
///
/// Conceptually, the data flow is like this:
/// ```none
/// FileScanTaskDeleteFile
/// |
/// Skip Started EQ Deletes
/// |
/// |
/// [load recordbatch stream / puffin]
/// DeleteFileContext
/// |
/// |
/// +-----------------------------+--------------------------+
/// Pos Del Del Vec (Not yet Implemented) EQ Del
/// | | |
/// [parse pos del stream] [parse del vec puffin] [parse eq del]
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
/// | | |
/// | | [persist to state]
/// | | ()
/// | | |
/// +-----------------------------+--------------------------+
/// |
/// [buffer unordered]
/// |
/// [combine del vectors]
/// HashMap<String, RoaringTreeMap>
/// |
/// [persist del vectors to state]
/// ()
/// |
/// |
/// [join!]
/// ```
pub(crate) fn load_deletes(
&self,
delete_file_entries: &[FileScanTaskDeleteFile],
schema: SchemaRef,
) -> Receiver<Result<DeleteFilter>> {
let (tx, rx) = channel();
let del_filter = DeleteFilter::default();

let stream_items = delete_file_entries
.iter()
.map(|t| {
(
t.clone(),
self.basic_delete_file_loader.clone(),
del_filter.clone(),
schema.clone(),
)
})
.collect::<Vec<_>>();
let task_stream = futures::stream::iter(stream_items);

let del_filter = del_filter.clone();
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
crate::runtime::spawn(async move {
let result = async move {
let mut del_filter = del_filter;
let basic_delete_file_loader = basic_delete_file_loader.clone();

let results: Vec<ParsedDeleteFileContext> = task_stream
.map(move |(task, file_io, del_filter, schema)| {
let basic_delete_file_loader = basic_delete_file_loader.clone();
async move {
Self::load_file_for_task(
&task,
basic_delete_file_loader.clone(),
del_filter,
schema,
)
.await
}
})
.map(move |ctx| {
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_collect::<Vec<_>>()
.await?;

for item in results {
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
for (data_file_path, delete_vector) in hash_map.into_iter() {
del_filter.upsert_delete_vector(data_file_path, delete_vector);
}
}
}

Ok(del_filter)
}
.await;

let _ = tx.send(result);
});

rx
}

async fn load_file_for_task(
task: &FileScanTaskDeleteFile,
basic_delete_file_loader: BasicDeleteFileLoader,
del_filter: DeleteFilter,
schema: SchemaRef,
) -> Result<DeleteFileContext> {
match task.file_type {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
)),

DataContentType::EqualityDeletes => {
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
return Ok(DeleteFileContext::ExistingEqDel);
};

let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);

Ok(DeleteFileContext::FreshEqDel {
batch_stream: BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
schema,
)
.await?,
sender,
})
}

DataContentType::Data => Err(Error::new(
ErrorKind::Unexpected,
"tasks with files of type Data not expected here",
)),
}
}

async fn parse_file_content_for_task(
ctx: DeleteFileContext,
) -> Result<ParsedDeleteFileContext> {
match ctx {
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
DeleteFileContext::PosDels(batch_stream) => {
let del_vecs =
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
}
DeleteFileContext::FreshEqDel {
sender,
batch_stream,
} => {
let predicate =
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;

sender
.send(predicate)
.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
"Could not send eq delete predicate to state",
)
})
.map(|_| ParsedDeleteFileContext::EqDel)
}
}
}

/// Parses a record batch stream coming from positional delete files
///
/// Returns a map of data file path to a delete vector
async fn parse_positional_deletes_record_batch_stream(
stream: ArrowRecordBatchStream,
) -> Result<HashMap<String, DeleteVector>> {
// TODO

Err(Error::new(
ErrorKind::FeatureUnsupported,
"parsing of positional deletes is not yet supported",
))
}

/// Parses record batch streams from individual equality delete files
///
/// Returns an unbound Predicate for each batch stream
async fn parse_equality_deletes_record_batch_stream(
streams: ArrowRecordBatchStream,
) -> Result<Predicate> {
// TODO

Err(Error::new(
ErrorKind::FeatureUnsupported,
"parsing of equality deletes is not yet supported",
))
}
}

#[cfg(test)]
mod tests {
use tempfile::TempDir;

use super::*;
use crate::arrow::delete_file_loader::tests::setup;

#[tokio::test]
async fn test_delete_file_manager_load_deletes() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);

let file_scan_tasks = setup(table_location);

let result = delete_file_manager
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap();

assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
}
}
273 changes: 273 additions & 0 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};

use crate::arrow::ArrowReader;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
use crate::spec::{Schema, SchemaRef};
use crate::{Error, ErrorKind, Result};

/// Delete File Loader
#[allow(unused)]
#[async_trait::async_trait]
pub trait DeleteFileLoader {
/// Read the delete file referred to in the task
///
/// Returns the contents of the delete file as a RecordBatch stream. Applies schema evolution.
async fn read_delete_file(
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream>;
}

#[derive(Clone, Debug)]
pub(crate) struct BasicDeleteFileLoader {
file_io: FileIO,
}

#[allow(unused_variables)]
impl BasicDeleteFileLoader {
pub fn new(file_io: FileIO) -> Self {
BasicDeleteFileLoader { file_io }
}
/// Loads a RecordBatchStream for a given datafile.
pub(crate) async fn parquet_to_batch_stream(
&self,
data_file_path: &str,
) -> Result<ArrowRecordBatchStream> {
/*
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
as that introduces a circular dependency.
*/
let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
data_file_path,
self.file_io.clone(),
false,
)
.await?
.build()?
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e)));

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

/// Evolves the schema of the RecordBatches from an equality delete file
pub(crate) async fn evolve_schema(
record_batch_stream: ArrowRecordBatchStream,
target_schema: Arc<Schema>,
) -> Result<ArrowRecordBatchStream> {
let eq_ids = target_schema
.as_ref()
.field_id_to_name_map()
.keys()
.cloned()
.collect::<Vec<_>>();

let mut record_batch_transformer =
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);

let record_batch_stream = record_batch_stream.map(move |record_batch| {
record_batch.and_then(|record_batch| {
record_batch_transformer.process_record_batch(record_batch)
})
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
}

#[async_trait::async_trait]
impl DeleteFileLoader for BasicDeleteFileLoader {
async fn read_delete_file(
&self,
task: &FileScanTaskDeleteFile,
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;

Self::evolve_schema(raw_batch_stream, schema).await
}
}

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;

use arrow_array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::Schema as ArrowSchema;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use super::*;
use crate::scan::FileScanTask;
use crate::spec::{DataContentType, DataFileFormat, Schema};

type ArrowSchemaRef = Arc<ArrowSchema>;

const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;

#[tokio::test]
async fn test_basic_delete_file_loader_read_delete_file() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());

let file_scan_tasks = setup(table_location);

let result = delete_file_loader
.read_delete_file(
&file_scan_tasks[0].deletes[0],
file_scan_tasks[0].schema_ref(),
)
.await
.unwrap();

let result = result.try_collect::<Vec<_>>().await.unwrap();

assert_eq!(result.len(), 1);
}

pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
let positional_delete_schema = create_pos_del_schema();

let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];

let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for n in 1..=3 {
let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col.clone(),
pos_col.clone(),
])
.unwrap();

let file = File::create(format!(
"{}/pos-del-{}.parquet",
table_location.to_str().unwrap(),
n
))
.unwrap();
let mut writer = ArrowWriter::try_new(
file,
positional_deletes_to_write.schema(),
Some(props.clone()),
)
.unwrap();

writer
.write(&positional_deletes_to_write)
.expect("Writing batch");

// writer must be closed to write footer
writer.close().unwrap();
}

let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let file_scan_tasks = vec![
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_2, pos_del_3],
},
];

file_scan_tasks
}

pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
let fields = vec![
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)]),
),
];
Arc::new(arrow_schema::Schema::new(fields))
}
}
95 changes: 0 additions & 95 deletions crates/iceberg/src/arrow/delete_file_manager.rs

This file was deleted.

353 changes: 353 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};

use tokio::sync::Notify;
use tokio::sync::oneshot::Receiver;

use crate::delete_vector::DeleteVector;
use crate::expr::Predicate::AlwaysTrue;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
use crate::spec::DataContentType;
use crate::{Error, ErrorKind, Result};

#[derive(Debug)]
enum EqDelState {
Loading(Arc<Notify>),
Loaded(Predicate),
}

#[derive(Debug, Default)]
struct DeleteFileFilterState {
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
equality_deletes: HashMap<String, EqDelState>,
}

#[derive(Clone, Debug, Default)]
pub(crate) struct DeleteFilter {
state: Arc<RwLock<DeleteFileFilterState>>,
}

impl DeleteFilter {
/// Retrieve a delete vector for the data file associated with a given file scan task
pub(crate) fn get_delete_vector(
&self,
file_scan_task: &FileScanTask,
) -> Option<Arc<Mutex<DeleteVector>>> {
self.get_delete_vector_for_path(file_scan_task.data_file_path())
}

/// Retrieve a delete vector for a data file
pub(crate) fn get_delete_vector_for_path(
&self,
delete_file_path: &str,
) -> Option<Arc<Mutex<DeleteVector>>> {
self.state
.read()
.ok()
.and_then(|st| st.delete_vectors.get(delete_file_path).cloned())
}

pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
let mut state = self.state.write().unwrap();

if !state.equality_deletes.contains_key(file_path) {
return None;
}

let notifier = Arc::new(Notify::new());
state
.equality_deletes
.insert(file_path.to_string(), EqDelState::Loading(notifier.clone()));

Some(notifier)
}

/// Retrieve the equality delete predicate for a given eq delete file path
pub(crate) async fn get_equality_delete_predicate_for_delete_file_path(
&self,
file_path: &str,
) -> Option<Predicate> {
let notifier = {
match self.state.read().unwrap().equality_deletes.get(file_path) {
None => return None,
Some(EqDelState::Loading(notifier)) => notifier.clone(),
Some(EqDelState::Loaded(predicate)) => {
return Some(predicate.clone());
}
}
};

notifier.notified().await;

match self.state.read().unwrap().equality_deletes.get(file_path) {
Some(EqDelState::Loaded(predicate)) => Some(predicate.clone()),
_ => unreachable!("Cannot be any other state than loaded"),
}
}

/// Builds eq delete predicate for the provided task.
pub(crate) async fn build_equality_delete_predicate(
&self,
file_scan_task: &FileScanTask,
) -> Result<Option<BoundPredicate>> {
// * Filter the task's deletes into just the Equality deletes
// * Retrieve the unbound predicate for each from self.state.equality_deletes
// * Logical-AND them all together to get a single combined `Predicate`
// * Bind the predicate to the task's schema to get a `BoundPredicate`

let mut combined_predicate = AlwaysTrue;
for delete in &file_scan_task.deletes {
if !is_equality_delete(delete) {
continue;
}

let Some(predicate) = self
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
.await
else {
return Err(Error::new(
ErrorKind::Unexpected,
format!(
"Missing predicate for equality delete file '{}'",
delete.file_path
),
));
};

combined_predicate = combined_predicate.and(predicate);
}

if combined_predicate == AlwaysTrue {
return Ok(None);
}

// TODO: handle case-insensitive case
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
Ok(Some(bound_predicate))
}

pub(crate) fn upsert_delete_vector(
&mut self,
data_file_path: String,
delete_vector: DeleteVector,
) {
let mut state = self.state.write().unwrap();

let Some(entry) = state.delete_vectors.get_mut(&data_file_path) else {
state
.delete_vectors
.insert(data_file_path, Arc::new(Mutex::new(delete_vector)));
return;
};

*entry.lock().unwrap() |= delete_vector;
}

pub(crate) fn insert_equality_delete(
&self,
delete_file_path: &str,
eq_del: Receiver<Predicate>,
) {
let notify = Arc::new(Notify::new());
{
let mut state = self.state.write().unwrap();
state.equality_deletes.insert(
delete_file_path.to_string(),
EqDelState::Loading(notify.clone()),
);
}

let state = self.state.clone();
let delete_file_path = delete_file_path.to_string();
crate::runtime::spawn(async move {
let eq_del = eq_del.await.unwrap();
{
let mut state = state.write().unwrap();
state
.equality_deletes
.insert(delete_file_path, EqDelState::Loaded(eq_del));
}
notify.notify_waiters();
});
}
}

pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
matches!(f.file_type, DataContentType::EqualityDeletes)
}

#[cfg(test)]
mod tests {
use std::fs::File;
use std::path::Path;
use std::sync::Arc;

use arrow_array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::Schema as ArrowSchema;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use super::*;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::io::FileIO;
use crate::spec::{DataFileFormat, Schema};

type ArrowSchemaRef = Arc<ArrowSchema>;

const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;

#[tokio::test]
async fn test_delete_file_manager_load_deletes() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);

let file_scan_tasks = setup(table_location);

let result = delete_file_manager
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap();

assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
}

fn setup(table_location: &Path) -> Vec<FileScanTask> {
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
let positional_delete_schema = create_pos_del_schema();

let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];

let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for n in 1..=3 {
let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col.clone(),
pos_col.clone(),
])
.unwrap();

let file = File::create(format!(
"{}/pos-del-{}.parquet",
table_location.to_str().unwrap(),
n
))
.unwrap();
let mut writer = ArrowWriter::try_new(
file,
positional_deletes_to_write.schema(),
Some(props.clone()),
)
.unwrap();

writer
.write(&positional_deletes_to_write)
.expect("Writing batch");

// writer must be closed to write footer
writer.close().unwrap();
}

let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let file_scan_tasks = vec![
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_2, pos_del_3],
},
];

file_scan_tasks
}

fn create_pos_del_schema() -> ArrowSchemaRef {
let fields = vec![
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)]),
),
];
Arc::new(arrow_schema::Schema::new(fields))
}
}
7 changes: 5 additions & 2 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -22,12 +22,15 @@ pub use schema::*;

mod nan_val_cnt_visitor;
pub(crate) use nan_val_cnt_visitor::*;

pub(crate) mod delete_file_manager;
pub(crate) mod caching_delete_file_loader;
/// Delete File loader
pub mod delete_file_loader;
pub(crate) mod delete_filter;

mod reader;
pub(crate) mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod value;

pub use reader::*;
pub use value::*;
52 changes: 28 additions & 24 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder,
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::delete_file_manager::CachingDeleteFileManager;
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
@@ -110,7 +110,7 @@ impl ArrowReaderBuilder {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io.clone(),
delete_file_manager: CachingDeleteFileManager::new(
delete_file_loader: CachingDeleteFileLoader::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not store a loader, instead we should store a DeleteFilter. Wha't should be called in ArrowReader should be things like following:

deleteFilter.filter(recordBatchStream)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not making best use of the features that ArrowRecordBatchReader provides us if we do that. We can't implement the delete filter as a simple filter on a RecordBatchStream unless we ditch using parquet's ParquetRecordBatchStream.

We're using parquet's ParquetRecordBatchStream to do the predicate filtering before we even get access to the RecordBatchStream. So by the time we have a stream of RecordBatches, we can't apply positional deletes or delete vectors because we no longer know what row number in the original file a record batches row corresponds to, as some rows can have been filtered out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be able to update ArrowReader to store a DeleteFileFilter rather than a DeleteFileLoader. But not to change the semantics of how the filter itself is used to match deleteFilter.filter(recordBatchStream). Not without rewriting the ArrowReader entirely not use ParquetRecordBatchStream and as a consequence needing to reimplement all the predicate filtering logic, row selection, projection, page skipping, and row group skipping that it gives us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got your point, and you are right maybe java's interface is not best for rust implementation. I think it's fine that we move forward first and refactor later, as long as we don't expose public apis.

self.file_io.clone(),
self.concurrency_limit_data_files,
),
@@ -126,7 +126,7 @@ impl ArrowReaderBuilder {
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
delete_file_manager: CachingDeleteFileManager,
delete_file_loader: CachingDeleteFileLoader,

/// the maximum number of data files that can be fetched at the same time
concurrency_limit_data_files: usize,
@@ -153,7 +153,7 @@ impl ArrowReader {
task,
batch_size,
file_io,
self.delete_file_manager.clone(),
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
@@ -167,26 +167,26 @@ impl ArrowReader {
Ok(Box::pin(stream) as ArrowRecordBatchStream)
}

#[allow(clippy::too_many_arguments)]
async fn process_file_scan_task(
task: FileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
delete_file_manager: CachingDeleteFileManager,
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();

// concurrently retrieve delete files and create RecordBatchStreamBuilder
let (_, mut record_batch_stream_builder) = try_join!(
delete_file_manager.load_deletes(task.deletes.clone()),
Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
)
)?;
let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());

let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
@@ -208,7 +208,8 @@ impl ArrowReader {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?;
let delete_filter = delete_filter_rx.await.unwrap()?;
let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should hide these building of filter under DeleterFilter, rather than calling them directly in ArrowReader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you explain a bit more? I don't understand what you're asking for here. As per the previous comment, we're using ParquetRecordBatchStream to do most of the heavy lifting for us on a number of features. This means that the design of the DeleteFilter can't be as simple as just exposing a method that filters a recordbatch stream.


// In addition to the optional predicate supplied in the `FileScanTask`,
// we also have an optional predicate resulting from equality delete files.
@@ -276,15 +277,18 @@ impl ArrowReader {
}
}

let positional_delete_indexes =
delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path);
let positional_delete_indexes = delete_filter.get_delete_vector(&task);

if let Some(positional_delete_indexes) = positional_delete_indexes {
let delete_row_selection = Self::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&selected_row_group_indices,
positional_delete_indexes.as_ref(),
)?;
let delete_row_selection = {
let positional_delete_indexes = positional_delete_indexes.lock().unwrap();

Self::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&selected_row_group_indices,
&positional_delete_indexes,
)
}?;

// merge the row selection from the delete files with the row selection
// from the filter predicate, if there is one from the filter predicate
@@ -319,7 +323,7 @@ impl ArrowReader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

async fn create_parquet_record_batch_stream_builder(
pub(crate) async fn create_parquet_record_batch_stream_builder(
data_file_path: &str,
file_io: FileIO,
should_load_page_index: bool,
@@ -1839,7 +1843,7 @@ message schema {

/* cases to cover:
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
{first|imtermediate|last} {skipped|selected} row group
{first|intermediate|last} {skipped|selected} row group
* row group selection disabled
*/

76 changes: 34 additions & 42 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
@@ -16,29 +16,26 @@
// under the License.

use std::collections::HashMap;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use futures::StreamExt;
use futures::channel::mpsc::{Sender, channel};
use tokio::sync::Notify;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::{Error, ErrorKind, Result};

/// Index of delete files
#[derive(Clone, Debug)]
#[derive(Debug, Clone)]
pub(crate) struct DeleteFileIndex {
state: Arc<RwLock<DeleteFileIndexState>>,
}

#[derive(Debug)]
enum DeleteFileIndexState {
Populating,
Populating(Arc<Notify>),
Populated(PopulatedDeleteFileIndex),
}

@@ -59,7 +56,10 @@ impl DeleteFileIndex {
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
// TODO: what should the channel limit be?
let (tx, rx) = channel(10);
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
let notify = Arc::new(Notify::new());
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
notify.clone(),
)));
let delete_file_stream = rx.boxed();

spawn({
@@ -69,26 +69,41 @@ impl DeleteFileIndex {

let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);

let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
{
let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
}
notify.notify_waiters();
}
});

(DeleteFileIndex { state }, tx)
}

/// Gets all the delete files that apply to the specified data file.
///
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
pub(crate) fn get_deletes_for_data_file<'a>(
pub(crate) async fn get_deletes_for_data_file(
&self,
data_file: &'a DataFile,
data_file: &DataFile,
seq_num: Option<i64>,
) -> DeletesForDataFile<'a> {
DeletesForDataFile {
state: self.state.clone(),
data_file,
seq_num,
) -> Vec<FileScanTaskDeleteFile> {
let notifier = {
let guard = self.state.read().unwrap();
match *guard {
DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
DeleteFileIndexState::Populated(ref index) => {
return index.get_deletes_for_data_file(data_file, seq_num);
}
}
};

notifier.notified().await;

let guard = self.state.read().unwrap();
match guard.deref() {
DeleteFileIndexState::Populated(index) => {
index.get_deletes_for_data_file(data_file, seq_num)
}
_ => unreachable!("Cannot be any other state than loaded"),
}
}
}
@@ -99,7 +114,7 @@ impl PopulatedDeleteFileIndex {
///
/// 1. The partition information is extracted from each delete file's manifest entry.
/// 2. If the partition is empty and the delete file is not a positional delete,
/// it is added to the `global_delees` vector
/// it is added to the `global_deletes` vector
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
@@ -193,26 +208,3 @@ impl PopulatedDeleteFileIndex {
results
}
}

/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
pub(crate) struct DeletesForDataFile<'a> {
state: Arc<RwLock<DeleteFileIndexState>>,
data_file: &'a DataFile,
seq_num: Option<i64>,
}

impl Future for DeletesForDataFile<'_> {
type Output = Result<Vec<FileScanTaskDeleteFile>>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state.try_read() {
Ok(guard) => match guard.deref() {
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
)),
_ => Poll::Pending,
},
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
}
}
}
10 changes: 9 additions & 1 deletion crates/iceberg/src/delete_vector.rs
Original file line number Diff line number Diff line change
@@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::BitOrAssign;

use roaring::RoaringTreemap;
use roaring::bitmap::Iter;
use roaring::treemap::BitmapIter;

#[allow(unused)]
#[derive(Debug, Default)]
pub struct DeleteVector {
inner: RoaringTreemap,
}
@@ -103,3 +105,9 @@ impl DeleteVectorIterator<'_> {
inner.bitmap_iter.advance_to(lo);
}
}

impl BitOrAssign for DeleteVector {
fn bitor_assign(&mut self, other: Self) {
self.inner.bitor_assign(&other.inner);
}
}
40 changes: 16 additions & 24 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ pub(crate) struct ManifestFileContext {
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: Option<DeleteFileIndex>,
delete_file_index: DeleteFileIndex,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext {
pub bound_predicates: Option<Arc<BoundPredicates>>,
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: Option<DeleteFileIndex>,
pub delete_file_index: DeleteFileIndex,
}

impl ManifestFileContext {
@@ -105,16 +105,13 @@ impl ManifestEntryContext {
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
/// created from it
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
let deletes = if let Some(delete_file_index) = self.delete_file_index {
delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await?
} else {
vec![]
};
let deletes = self
.delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await;

Ok(FileScanTask {
start: 0,
@@ -188,24 +185,19 @@ impl PlanContext {
&self,
manifest_list: Arc<ManifestList>,
tx_data: Sender<ManifestEntryContext>,
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
let manifest_files = manifest_list.entries().iter();

// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];

for manifest_file in manifest_files {
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
continue;
};
(Some(delete_file_idx.clone()), tx.clone())
let tx = if manifest_file.content == ManifestContentType::Deletes {
delete_file_tx.clone()
} else {
(
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
tx_data.clone(),
)
tx_data.clone()
};

let partition_bound_predicate = if self.predicate.is_some() {
@@ -233,7 +225,7 @@ impl PlanContext {
manifest_file,
partition_bound_predicate,
tx,
delete_file_idx,
delete_file_idx.clone(),
);

filtered_mfcs.push(Ok(mfc));
@@ -247,7 +239,7 @@ impl PlanContext {
manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
delete_file_index: Option<DeleteFileIndex>,
delete_file_index: DeleteFileIndex,
) -> ManifestFileContext {
let bound_predicates =
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
80 changes: 25 additions & 55 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -59,11 +59,6 @@ pub struct TableScanBuilder<'a> {
concurrency_limit_manifest_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,

// TODO: defaults to false for now whilst delete file processing
// is still being worked on but will switch to a default of true
// once this work is complete
delete_file_processing_enabled: bool,
}

impl<'a> TableScanBuilder<'a> {
@@ -82,7 +77,6 @@ impl<'a> TableScanBuilder<'a> {
concurrency_limit_manifest_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
delete_file_processing_enabled: false,
}
}

@@ -189,17 +183,6 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Determines whether to enable delete file processing (currently disabled by default)
///
/// When disabled, delete files are ignored.
pub fn with_delete_file_processing_enabled(
mut self,
delete_file_processing_enabled: bool,
) -> Self {
self.delete_file_processing_enabled = delete_file_processing_enabled;
self
}

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
@@ -226,7 +209,6 @@ impl<'a> TableScanBuilder<'a> {
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
});
};
current_snapshot_id.clone()
@@ -317,7 +299,6 @@ impl<'a> TableScanBuilder<'a> {
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
})
}
}
@@ -346,7 +327,6 @@ pub struct TableScan {

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
delete_file_processing_enabled: bool,
}

impl TableScan {
@@ -368,12 +348,7 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
} else {
None
};
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();

let manifest_list = plan_context.get_manifest_list().await?;

@@ -383,9 +358,8 @@ impl TableScan {
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
}),
delete_file_idx.clone(),
manifest_entry_delete_ctx_tx,
)?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -404,34 +378,30 @@ impl TableScan {
});

let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();

if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();

// Process the delete file [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_delete_ctx_rx
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_delete_manifest_entry(manifest_entry_context, tx)
.await
})
.await
},
)
.await;
// Process the delete file [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_delete_ctx_rx
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
})
.await
},
)
.await;

if let Err(error) = result {
let _ = channel_for_delete_manifest_entry_error
.send(Err(error))
.await;
}
})
.await;
}
if let Err(error) = result {
let _ = channel_for_delete_manifest_entry_error
.send(Err(error))
.await;
}
})
.await;

// Process the data file [`ManifestEntry`] stream in parallel
spawn(async move {
Original file line number Diff line number Diff line change
@@ -37,11 +37,7 @@ async fn test_read_table_with_positional_deletes() {
.await
.unwrap();

let scan = table
.scan()
.with_delete_file_processing_enabled(true)
.build()
.unwrap();
let scan = table.scan().build().unwrap();
println!("{:?}", scan);

let plan: Vec<_> = scan