Skip to content

Commit 58b0f07

Browse files
committed
feat: schema evolution of equality delete file record batches
1 parent b3b2d32 commit 58b0f07

File tree

2 files changed

+51
-12
lines changed

2 files changed

+51
-12
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ use futures::channel::oneshot;
2525
use futures::future::join_all;
2626
use futures::{StreamExt, TryStreamExt};
2727

28+
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
2829
use crate::arrow::ArrowReader;
2930
use crate::delete_vector::DeleteVector;
3031
use crate::expr::Predicate::AlwaysTrue;
3132
use crate::expr::{Bind, BoundPredicate, Predicate};
3233
use crate::io::FileIO;
3334
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
34-
use crate::spec::DataContentType;
35+
use crate::spec::{DataContentType, Schema, SchemaRef};
3536
use crate::{Error, ErrorKind, Result};
3637

3738
#[allow(unused)]
@@ -164,7 +165,7 @@ impl CachingDeleteFileManager {
164165
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
165166
/// channel to store them in the right place in the delete file managers state.
166167
/// * The results of all of these futures are awaited on in parallel with the specified
167-
/// level of concurrency and collected into a vec. We then combine all of the delete
168+
/// level of concurrency and collected into a vec. We then combine all the delete
168169
/// vector maps that resulted from any positional delete or delete vector files into a
169170
/// single map and persist it in the state.
170171
///
@@ -206,19 +207,25 @@ impl CachingDeleteFileManager {
206207
pub(crate) async fn load_deletes(
207208
&self,
208209
delete_file_entries: &[FileScanTaskDeleteFile],
210+
schema: SchemaRef,
209211
) -> Result<()> {
210212
let stream_items = delete_file_entries
211213
.iter()
212-
.map(|t| (t.clone(), self.file_io.clone(), self.state.clone()))
214+
.map(|t| (
215+
t.clone(),
216+
self.file_io.clone(),
217+
self.state.clone(),
218+
schema.clone(),
219+
))
213220
.collect::<Vec<_>>();
214221
// NOTE: removing the collect and just passing the iterator to futures::stream:iter
215222
// results in an error 'implementation of `std::ops::FnOnce` is not general enough'
216223

217224
let task_stream = futures::stream::iter(stream_items.into_iter());
218225

219226
let results: Vec<ParsedDeleteFileContext> = task_stream
220-
.map(move |(task, file_io, state_ref)| async {
221-
Self::load_file_for_task(task, file_io, state_ref).await
227+
.map(move |(task, file_io, state_ref, schema)| async {
228+
Self::load_file_for_task(task, file_io, state_ref, schema).await
222229
})
223230
.map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await }))
224231
.try_buffer_unordered(self.concurrency_limit_data_files)
@@ -248,6 +255,7 @@ impl CachingDeleteFileManager {
248255
task: FileScanTaskDeleteFile,
249256
file_io: FileIO,
250257
state: StateRef,
258+
schema: SchemaRef,
251259
) -> Result<DeleteFileContext> {
252260
match task.file_type {
253261
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
@@ -271,7 +279,11 @@ impl CachingDeleteFileManager {
271279
};
272280

273281
Ok(DeleteFileContext::FreshEqDel {
274-
batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
282+
batch_stream: Self::evolve_schema(
283+
Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
284+
schema,
285+
)
286+
.await?,
275287
sender,
276288
})
277289
}
@@ -351,6 +363,30 @@ impl CachingDeleteFileManager {
351363
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
352364
}
353365

366+
/// Evolves the schema of the RecordBatches from an equality delete file
367+
async fn evolve_schema(
368+
record_batch_stream: ArrowRecordBatchStream,
369+
target_schema: Arc<Schema>,
370+
) -> Result<ArrowRecordBatchStream> {
371+
let eq_ids = target_schema
372+
.as_ref()
373+
.field_id_to_name_map()
374+
.keys()
375+
.cloned()
376+
.collect::<Vec<_>>();
377+
378+
let mut record_batch_transformer =
379+
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
380+
381+
let record_batch_stream = record_batch_stream.map(move |record_batch| {
382+
record_batch.and_then(|record_batch|
383+
record_batch_transformer.process_record_batch(record_batch)
384+
)
385+
});
386+
387+
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
388+
}
389+
354390
/// Parses a record batch stream coming from positional delete files
355391
///
356392
/// Returns a map of data file path to a delete vector
@@ -483,7 +519,7 @@ mod tests {
483519
let file_scan_tasks = setup(table_location);
484520

485521
let result = delete_file_manager
486-
.load_deletes(&file_scan_tasks[0].deletes)
522+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
487523
.await;
488524

489525
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));

crates/iceberg/src/arrow/reader.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,14 @@ impl ArrowReader {
200200

201201
// concurrently retrieve delete files and create RecordBatchStreamBuilder
202202
let (_, mut record_batch_stream_builder) = try_join!(
203-
delete_file_manager.load_deletes(if delete_file_support_enabled {
204-
&task.deletes
205-
} else {
206-
&[]
207-
},),
203+
delete_file_manager.load_deletes(
204+
if delete_file_support_enabled {
205+
&task.deletes
206+
} else {
207+
&[]
208+
},
209+
task.schema.clone()
210+
),
208211
Self::create_parquet_record_batch_stream_builder(
209212
&task.data_file_path,
210213
file_io.clone(),

0 commit comments

Comments
 (0)