Skip to content

Scan Delete Support Part 3: ArrowReader::build_deletes_row_selection implementation #951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ port_scanner = "0.1.5"
rand = "0.8.5"
regex = "1.10.5"
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
roaring = "0.10"
roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting on this to be released by roaring-rs so that we don't need the git ref here

rust_decimal = "1.31"
serde = { version = "1.0.204", features = ["rc"] }
serde_bytes = "0.11.15"
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/src/arrow/delete_file_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::delete_vector::DeleteVector;
use crate::expr::BoundPredicate;
use crate::io::FileIO;
Expand Down Expand Up @@ -85,7 +87,7 @@ impl CachingDeleteFileManager {
pub(crate) fn get_positional_delete_indexes_for_data_file(
&self,
data_file_path: &str,
) -> Option<DeleteVector> {
) -> Option<Arc<DeleteVector>> {
// TODO

None
Expand Down
260 changes: 252 additions & 8 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
Expand Down Expand Up @@ -280,7 +282,7 @@ impl ArrowReader {
let delete_row_selection = Self::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&selected_row_group_indices,
positional_delete_indexes,
positional_delete_indexes.as_ref(),
)?;

// merge the row selection from the delete files with the row selection
Expand Down Expand Up @@ -342,15 +344,109 @@ impl ArrowReader {
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
/// as having been deleted by a positional delete, taking into account any row groups that have
/// been skipped entirely by the filter predicate
#[allow(unused)]
fn build_deletes_row_selection(
row_group_metadata: &[RowGroupMetaData],
row_group_metadata_list: &[RowGroupMetaData],
selected_row_groups: &Option<Vec<usize>>,
mut positional_deletes: DeleteVector,
positional_deletes: &DeleteVector,
) -> Result<RowSelection> {
// TODO
let mut results: Vec<RowSelector> = Vec::new();
let mut selected_row_groups_idx = 0;
let mut current_row_group_base_idx: u64 = 0;
let mut delete_vector_iter = positional_deletes.iter();
let mut next_deleted_row_idx_opt = delete_vector_iter.next();

for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
let row_group_num_rows = row_group_metadata.num_rows() as u64;
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;

// if row group selection is enabled,
if let Some(selected_row_groups) = selected_row_groups {
// if we've consumed all the selected row groups, we're done
if selected_row_groups_idx == selected_row_groups.len() {
break;
}

if idx == selected_row_groups[selected_row_groups_idx] {
// we're in a selected row group. Increment selected_row_groups_idx
// so that next time around the for loop we're looking for the next
// selected row group
selected_row_groups_idx += 1;
} else {
// remove any positional deletes from the skipped page so that
// `positional.deletes.min()` can be used
delete_vector_iter.advance_to(next_row_group_base_idx);
next_deleted_row_idx_opt = delete_vector_iter.next();

// still increment the current page base index but then skip to the next row group
// in the file
current_row_group_base_idx += row_group_num_rows;
continue;
}
}

let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => {
// if the index of the next deleted row is beyond this row group, add a selection for
// the remainder of this row group and skip to the next row group
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as usize));
continue;
}

next_deleted_row_idx
}

// If there are no more pos deletes, add a selector for the entirety of this row group.
_ => {
results.push(RowSelector::select(row_group_num_rows as usize));
continue;
}
};

let mut current_idx = current_row_group_base_idx;
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
// `select` all rows that precede the next delete index
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length as usize));
current_idx += run_length;
}

// `skip` all consecutive deleted rows in the current row group
let mut run_length = 0;
while next_deleted_row_idx == current_idx
&& next_deleted_row_idx < next_row_group_base_idx
{
run_length += 1;
current_idx += 1;

next_deleted_row_idx_opt = delete_vector_iter.next();
next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => next_deleted_row_idx,
_ => {
// We've processed the final positional delete.
// Conclude the skip and then break so that we select the remaining
// rows in the row group and move on to the next row group
results.push(RowSelector::skip(run_length));
break 'chunks;
}
};
}
if run_length > 0 {
results.push(RowSelector::skip(run_length));
}
}

if current_idx < next_row_group_base_idx {
results.push(RowSelector::select(
(next_row_group_base_idx - current_idx) as usize,
));
}

current_row_group_base_idx += row_group_num_rows;
}

Ok(RowSelection::default())
Ok(results.into())
}

fn build_field_id_set_and_map(
Expand Down Expand Up @@ -1284,15 +1380,19 @@ mod tests {
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ProjectionMask};
use parquet::basic::Compression;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::WriterProperties;
use parquet::schema::parser::parse_message_type;
use parquet::schema::types::SchemaDescriptor;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use roaring::RoaringTreemap;
use tempfile::TempDir;

use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
use crate::io::FileIO;
Expand Down Expand Up @@ -1593,4 +1693,148 @@ message schema {

(file_io, schema, table_location, tmp_dir)
}

#[test]
fn test_build_deletes_row_selection() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not enough, should we consider using property test to generate random data to verify it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Will do.

let schema_descr = get_test_schema_descr();

let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
columns.push(column);
}

let row_groups_metadata = vec![
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
];

let selected_row_groups = Some(vec![1, 3]);

/* cases to cover:
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
{first|imtermediate|last} {skipped|selected} row group
* row group selection disabled
*/

let positional_deletes = RoaringTreemap::from_iter(&[
1, // in skipped rg 0, should be ignored
3, // run of three consecutive items in skipped rg0
4, 5, 998, // two consecutive items at end of skipped rg0
999, 1000, // solitary row at start of selected rg1 (1, 9)
1010, // run of 3 rows in selected rg1
1011, 1012, // (3, 485)
1498, // run of two items at end of selected rg1
1499, 1500, // run of two items at start of skipped rg2
1501, 1600, // should ignore, in skipped rg2
1999, // single row at end of skipped rg2
2000, // run of two items at start of selected rg3
2001, // (4, 98)
2100, // single row in selected row group 3 (1, 99)
2200, // run of 3 consecutive rows in selected row group 3
2201, 2202, // (3, 796)
2999, // single item at end of selected rg3 (1)
3000, // single item at start of skipped rg4
]);

let positional_deletes = DeleteVector::new(positional_deletes);

// using selected row groups 1 and 3
let result = ArrowReader::build_deletes_row_selection(
&row_groups_metadata,
&selected_row_groups,
&positional_deletes,
)
.unwrap();

let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(9),
RowSelector::skip(3),
RowSelector::select(485),
RowSelector::skip(4),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(99),
RowSelector::skip(3),
RowSelector::select(796),
RowSelector::skip(1),
]);

assert_eq!(result, expected);

// selecting all row groups
let result = ArrowReader::build_deletes_row_selection(
&row_groups_metadata,
&None,
&positional_deletes,
)
.unwrap();

let expected = RowSelection::from(vec![
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(3),
RowSelector::select(992),
RowSelector::skip(3),
RowSelector::select(9),
RowSelector::skip(3),
RowSelector::select(485),
RowSelector::skip(4),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(398),
RowSelector::skip(3),
RowSelector::select(98),
RowSelector::skip(1),
RowSelector::select(99),
RowSelector::skip(3),
RowSelector::select(796),
RowSelector::skip(2),
RowSelector::select(499),
]);

assert_eq!(result, expected);
}

fn build_test_row_group_meta(
schema_descr: SchemaDescPtr,
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
ordinal: i16,
) -> RowGroupMetaData {
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.set_ordinal(ordinal)
.build()
.unwrap()
}

fn get_test_schema_descr() -> SchemaDescPtr {
use parquet::schema::types::Type as SchemaType;

let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![
Arc::new(
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
.build()
.unwrap(),
),
])
.build()
.unwrap();

Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
}
Loading
Loading