Skip to content

Commit 3c5a38f

Browse files
committed
refactor: disabling delete file support will error in the read phase instead of the plan phase if a delete file is encountered
1 parent 5739a46 commit 3c5a38f

File tree

5 files changed

+55
-81
lines changed

5 files changed

+55
-81
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+3
Original file line numberDiff line numberDiff line change
@@ -574,18 +574,21 @@ mod tests {
574574
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
575575
file_type: DataContentType::PositionDeletes,
576576
partition_spec_id: 0,
577+
equality_ids: vec![],
577578
};
578579

579580
let pos_del_2 = FileScanTaskDeleteFile {
580581
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
581582
file_type: DataContentType::PositionDeletes,
582583
partition_spec_id: 0,
584+
equality_ids: vec![],
583585
};
584586

585587
let pos_del_3 = FileScanTaskDeleteFile {
586588
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
587589
file_type: DataContentType::PositionDeletes,
588590
partition_spec_id: 0,
591+
equality_ids: vec![],
589592
};
590593

591594
let file_scan_tasks = vec![

crates/iceberg/src/arrow/reader.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,7 @@ impl ArrowReader {
200200

201201
// concurrently retrieve delete files and create RecordBatchStreamBuilder
202202
let (_, mut record_batch_stream_builder) = try_join!(
203-
delete_file_manager.load_deletes(
204-
if delete_file_support_enabled {
205-
&task.deletes
206-
} else {
207-
&[]
208-
},
209-
task.schema.clone()
210-
),
203+
delete_file_manager.load_deletes(&task.deletes, task.schema.clone()),
211204
Self::create_parquet_record_batch_stream_builder(
212205
&task.data_file_path,
213206
file_io.clone(),
@@ -1749,7 +1742,7 @@ message schema {
17491742

17501743
/* cases to cover:
17511744
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
1752-
{first|imtermediate|last} {skipped|selected} row group
1745+
{first|intermediate|last} {skipped|selected} row group
17531746
* row group selection disabled
17541747
*/
17551748

crates/iceberg/src/scan/context.rs

+16-24
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) struct ManifestFileContext {
4545
object_cache: Arc<ObjectCache>,
4646
snapshot_schema: SchemaRef,
4747
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48-
delete_file_index: Option<DeleteFileIndex>,
48+
delete_file_index: DeleteFileIndex,
4949
}
5050

5151
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext {
5858
pub bound_predicates: Option<Arc<BoundPredicates>>,
5959
pub partition_spec_id: i32,
6060
pub snapshot_schema: SchemaRef,
61-
pub delete_file_index: Option<DeleteFileIndex>,
61+
pub delete_file_index: DeleteFileIndex,
6262
}
6363

6464
impl ManifestFileContext {
@@ -105,16 +105,13 @@ impl ManifestEntryContext {
105105
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
106106
/// created from it
107107
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
108-
let deletes = if let Some(delete_file_index) = self.delete_file_index {
109-
delete_file_index
110-
.get_deletes_for_data_file(
111-
self.manifest_entry.data_file(),
112-
self.manifest_entry.sequence_number(),
113-
)
114-
.await?
115-
} else {
116-
vec![]
117-
};
108+
let deletes = self
109+
.delete_file_index
110+
.get_deletes_for_data_file(
111+
self.manifest_entry.data_file(),
112+
self.manifest_entry.sequence_number(),
113+
)
114+
.await?;
118115

119116
Ok(FileScanTask {
120117
start: 0,
@@ -188,24 +185,19 @@ impl PlanContext {
188185
&self,
189186
manifest_list: Arc<ManifestList>,
190187
tx_data: Sender<ManifestEntryContext>,
191-
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
188+
delete_file_idx: DeleteFileIndex,
189+
tx_delete: Sender<ManifestEntryContext>,
192190
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
193191
let manifest_files = manifest_list.entries().iter();
194192

195193
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
196194
let mut filtered_mfcs = vec![];
197195

198196
for manifest_file in manifest_files {
199-
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
200-
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
201-
continue;
202-
};
203-
(Some(delete_file_idx.clone()), tx.clone())
197+
let tx = if manifest_file.content == ManifestContentType::Deletes {
198+
tx_delete.clone()
204199
} else {
205-
(
206-
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
207-
tx_data.clone(),
208-
)
200+
tx_data.clone()
209201
};
210202

211203
let partition_bound_predicate = if self.predicate.is_some() {
@@ -233,7 +225,7 @@ impl PlanContext {
233225
manifest_file,
234226
partition_bound_predicate,
235227
tx,
236-
delete_file_idx,
228+
delete_file_idx.clone(),
237229
);
238230

239231
filtered_mfcs.push(Ok(mfc));
@@ -247,7 +239,7 @@ impl PlanContext {
247239
manifest_file: &ManifestFile,
248240
partition_filter: Option<Arc<BoundPredicate>>,
249241
sender: Sender<ManifestEntryContext>,
250-
delete_file_index: Option<DeleteFileIndex>,
242+
delete_file_index: DeleteFileIndex,
251243
) -> ManifestFileContext {
252244
let bound_predicates =
253245
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =

crates/iceberg/src/scan/mod.rs

+27-36
Original file line numberDiff line numberDiff line change
@@ -368,12 +368,7 @@ impl TableScan {
368368
// used to stream the results back to the caller
369369
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
370370

371-
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
372-
if self.delete_file_processing_enabled {
373-
Some(DeleteFileIndex::new())
374-
} else {
375-
None
376-
};
371+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
377372

378373
let manifest_list = plan_context.get_manifest_list().await?;
379374

@@ -383,9 +378,8 @@ impl TableScan {
383378
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
384379
manifest_list,
385380
manifest_entry_data_ctx_tx,
386-
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
387-
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
388-
}),
381+
delete_file_idx.clone(),
382+
manifest_entry_delete_ctx_tx,
389383
)?;
390384

391385
let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -404,34 +398,30 @@ impl TableScan {
404398
});
405399

406400
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
401+
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
407402

408-
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
409-
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
410-
411-
// Process the delete file [`ManifestEntry`] stream in parallel
412-
spawn(async move {
413-
let result = manifest_entry_delete_ctx_rx
414-
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
415-
.try_for_each_concurrent(
416-
concurrency_limit_manifest_entries,
417-
|(manifest_entry_context, tx)| async move {
418-
spawn(async move {
419-
Self::process_delete_manifest_entry(manifest_entry_context, tx)
420-
.await
421-
})
422-
.await
423-
},
424-
)
425-
.await;
403+
// Process the delete file [`ManifestEntry`] stream in parallel
404+
spawn(async move {
405+
let result = manifest_entry_delete_ctx_rx
406+
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
407+
.try_for_each_concurrent(
408+
concurrency_limit_manifest_entries,
409+
|(manifest_entry_context, tx)| async move {
410+
spawn(async move {
411+
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
412+
})
413+
.await
414+
},
415+
)
416+
.await;
426417

427-
if let Err(error) = result {
428-
let _ = channel_for_delete_manifest_entry_error
429-
.send(Err(error))
430-
.await;
431-
}
432-
})
433-
.await;
434-
}
418+
if let Err(error) = result {
419+
let _ = channel_for_delete_manifest_entry_error
420+
.send(Err(error))
421+
.await;
422+
}
423+
})
424+
.await;
435425

436426
// Process the data file [`ManifestEntry`] stream in parallel
437427
spawn(async move {
@@ -461,7 +451,8 @@ impl TableScan {
461451
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
462452
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
463453
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
464-
.with_row_selection_enabled(self.row_selection_enabled);
454+
.with_row_selection_enabled(self.row_selection_enabled)
455+
.with_delete_file_support_enabled(self.delete_file_processing_enabled);
465456

466457
if let Some(batch_size) = self.batch_size {
467458
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

+7-12
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use iceberg_catalog_rest::RestCatalog;
2525
use crate::get_shared_containers;
2626

2727
#[tokio::test]
28-
async fn test_read_table_with_positional_deletes() {
28+
async fn test_read_table_with_positional_deletes_with_delete_support_disabled() {
2929
let fixture = get_shared_containers();
3030
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
3131

@@ -39,7 +39,7 @@ async fn test_read_table_with_positional_deletes() {
3939

4040
let scan = table
4141
.scan()
42-
.with_delete_file_processing_enabled(true)
42+
.with_delete_file_processing_enabled(false)
4343
.build()
4444
.unwrap();
4545
println!("{:?}", scan);
@@ -53,19 +53,14 @@ async fn test_read_table_with_positional_deletes() {
5353
.unwrap();
5454
println!("{:?}", plan);
5555

56-
// Scan plan phase should include delete files in file plan
57-
// when with_delete_file_processing_enabled == true
56+
// Scan plan phase stills include delete files in file plan
57+
// when with_delete_file_processing_enabled == false. We instead
58+
// fail at the read phase after this.
5859
assert_eq!(plan[0].deletes.len(), 2);
5960

60-
// 😱 If we don't support positional deletes, we should fail when we try to read a table that
61-
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
61+
// with delete_file_processing_enabled == false, we should fail when we
62+
// try to read a table that has positional deletes.
6263
let result = scan.to_arrow().await.unwrap().try_collect::<Vec<_>>().await;
6364

6465
assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported));
65-
66-
// When we get support for it:
67-
// let batch_stream = scan.to_arrow().await.unwrap();
68-
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
69-
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
70-
// assert_eq!(num_rows, 10);
7166
}

0 commit comments

Comments
 (0)