Skip to content

Commit 4163e70

Browse files
committed
feat: implementation for ArrowReader::build_deletes_row_selection
1 parent 58eda97 commit 4163e70

File tree

1 file changed

+245
-5
lines changed

1 file changed

+245
-5
lines changed

crates/iceberg/src/arrow/reader.rs

+245-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use bytes::Bytes;
3333
use fnv::FnvHashSet;
3434
use futures::future::BoxFuture;
3535
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
36-
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
36+
use parquet::arrow::arrow_reader::{
37+
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
38+
};
3739
use parquet::arrow::async_reader::AsyncFileReader;
3840
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
3941
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
@@ -342,15 +344,104 @@ impl ArrowReader {
342344
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
343345
/// as having been deleted by a positional delete, taking into account any row groups that have
344346
/// been skipped entirely by the filter predicate
345-
#[allow(unused)]
346347
fn build_deletes_row_selection(
347348
row_group_metadata: &[RowGroupMetaData],
348349
selected_row_groups: &Option<Vec<usize>>,
349350
mut positional_deletes: DeleteVector,
350351
) -> Result<RowSelection> {
351-
// TODO
352+
let mut results: Vec<RowSelector> = Vec::new();
353+
let mut selected_row_groups_idx = 0;
354+
let mut current_page_base_idx: u64 = 0;
355+
356+
for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() {
357+
let page_num_rows = row_group_metadata.num_rows() as u64;
358+
let next_page_base_idx = current_page_base_idx + page_num_rows;
359+
360+
// if row group selection is enabled,
361+
if let Some(selected_row_groups) = selected_row_groups {
362+
// if we've consumed all the selected row groups, we're done
363+
if selected_row_groups_idx == selected_row_groups.len() {
364+
break;
365+
}
366+
367+
if idx == selected_row_groups[selected_row_groups_idx] {
368+
// we're in a selected row group. Increment selected_row_groups_idx
369+
// so that next time around the for loop we're looking for the next
370+
// selected row group
371+
selected_row_groups_idx += 1;
372+
} else {
373+
// remove any positional deletes from the skipped page so that
374+
// `positional.deletes.min()` can be used
375+
positional_deletes.remove_range(current_page_base_idx..next_page_base_idx);
376+
377+
// still increment the current page base index but then skip to the next row group
378+
// in the file
379+
current_page_base_idx += page_num_rows;
380+
continue;
381+
}
382+
}
383+
384+
let mut next_deleted_row_idx = match positional_deletes.min() {
385+
Some(next_deleted_row_idx) => {
386+
// if the index of the next deleted row is beyond this page, add a selection for
387+
// the remainder of this page and skip to the next page
388+
if next_deleted_row_idx >= next_page_base_idx {
389+
results.push(RowSelector::select(page_num_rows as usize));
390+
continue;
391+
}
392+
393+
next_deleted_row_idx
394+
}
395+
396+
// If there are no more pos deletes, add a selector for the entirety of this page.
397+
_ => {
398+
results.push(RowSelector::select(page_num_rows as usize));
399+
continue;
400+
}
401+
};
402+
403+
let mut current_idx = current_page_base_idx;
404+
'chunks: while next_deleted_row_idx < next_page_base_idx {
405+
// `select` all rows that precede the next delete index
406+
if current_idx < next_deleted_row_idx {
407+
let run_length = next_deleted_row_idx - current_idx;
408+
results.push(RowSelector::select(run_length as usize));
409+
current_idx += run_length;
410+
}
411+
412+
// `skip` all consecutive deleted rows in the current row group
413+
let mut run_length = 0;
414+
while next_deleted_row_idx == current_idx
415+
&& next_deleted_row_idx < next_page_base_idx
416+
{
417+
run_length += 1;
418+
current_idx += 1;
419+
positional_deletes.remove(next_deleted_row_idx);
420+
421+
next_deleted_row_idx = match positional_deletes.min() {
422+
Some(next_deleted_row_idx) => next_deleted_row_idx,
423+
_ => {
424+
// We've processed the final positional delete.
425+
// Conclude the skip and then break so that we select the remaining
426+
// rows in the page and move on to the next row group
427+
results.push(RowSelector::skip(run_length));
428+
break 'chunks;
429+
}
430+
};
431+
}
432+
results.push(RowSelector::skip(run_length));
433+
}
434+
435+
if current_idx < next_page_base_idx {
436+
results.push(RowSelector::select(
437+
(next_page_base_idx - current_idx) as usize,
438+
));
439+
}
440+
441+
current_page_base_idx += page_num_rows;
442+
}
352443

353-
Ok(RowSelection::default())
444+
Ok(results.into())
354445
}
355446

356447
fn build_field_id_set_and_map(
@@ -1287,9 +1378,12 @@ mod tests {
12871378
use parquet::arrow::{ArrowWriter, ProjectionMask};
12881379
use parquet::basic::Compression;
12891380
use parquet::file::properties::WriterProperties;
1381+
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
1382+
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
12901383
use parquet::schema::parser::parse_message_type;
1291-
use parquet::schema::types::SchemaDescriptor;
12921384
use tempfile::TempDir;
1385+
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
1386+
use roaring::RoaringTreemap;
12931387

12941388
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
12951389
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
@@ -1593,4 +1687,150 @@ message schema {
15931687

15941688
(file_io, schema, table_location, tmp_dir)
15951689
}
1690+
1691+
#[test]
1692+
fn test_build_deletes_row_selection() {
1693+
let schema_descr = get_test_schema_descr();
1694+
1695+
let mut columns = vec![];
1696+
for ptr in schema_descr.columns() {
1697+
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
1698+
columns.push(column);
1699+
}
1700+
1701+
let row_groups_metadata = vec![
1702+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0),
1703+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1),
1704+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2),
1705+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3),
1706+
build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4),
1707+
];
1708+
1709+
let selected_row_groups = Some(vec![1, 3]);
1710+
1711+
/* cases to cover:
1712+
* {skip|select} {first|intermediate|last} {one row|multiple rows} in
1713+
{first|imtermediate|last} {skipped|selected} row group
1714+
* row group selection disabled
1715+
*/
1716+
1717+
let positional_deletes = RoaringTreemap::from_iter(&[
1718+
1, // in skipped rg 0, should be ignored
1719+
3, // run of three consecutive items in skipped rg0
1720+
4, 5, 998, // two consecutive items at end of skipped rg0
1721+
999, 1000, // solitary row at start of selected rg1 (1, 9)
1722+
1010, // run of 3 rows in selected rg1
1723+
1011, 1012, // (3, 485)
1724+
1498, // run of two items at end of selected rg1
1725+
1499, 1500, // run of two items at start of skipped rg2
1726+
1501, 1600, // should ignore, in skipped rg2
1727+
1999, // single row at end of skipped rg2
1728+
2000, // run of two items at start of selected rg3
1729+
2001, // (4, 98)
1730+
2100, // single row in selected row group 3 (1, 99)
1731+
2200, // run of 3 consecutive rows in selected row group 3
1732+
2201, 2202, // (3, 796)
1733+
2999, // single item at end of selected rg3 (1)
1734+
3000, // single item at start of skipped rg4
1735+
]);
1736+
1737+
let positional_deletes = DeleteVector {
1738+
inner: positional_deletes
1739+
};
1740+
1741+
// using selected row groups 1 and 3
1742+
let result = ArrowReader::build_deletes_row_selection(
1743+
&row_groups_metadata,
1744+
&selected_row_groups,
1745+
positional_deletes.clone(),
1746+
)
1747+
.unwrap();
1748+
1749+
let expected = RowSelection::from(vec![
1750+
RowSelector::skip(1),
1751+
RowSelector::select(9),
1752+
RowSelector::skip(3),
1753+
RowSelector::select(485),
1754+
RowSelector::skip(4),
1755+
RowSelector::select(98),
1756+
RowSelector::skip(1),
1757+
RowSelector::select(99),
1758+
RowSelector::skip(3),
1759+
RowSelector::select(796),
1760+
RowSelector::skip(1),
1761+
]);
1762+
1763+
assert_eq!(result, expected);
1764+
1765+
// selecting all row groups
1766+
let result = ArrowReader::build_deletes_row_selection(
1767+
&row_groups_metadata,
1768+
&None,
1769+
positional_deletes,
1770+
)
1771+
.unwrap();
1772+
1773+
let expected = RowSelection::from(vec![
1774+
RowSelector::select(1),
1775+
RowSelector::skip(1),
1776+
RowSelector::select(1),
1777+
RowSelector::skip(3),
1778+
RowSelector::select(992),
1779+
RowSelector::skip(3),
1780+
RowSelector::select(9),
1781+
RowSelector::skip(3),
1782+
RowSelector::select(485),
1783+
RowSelector::skip(4),
1784+
RowSelector::select(98),
1785+
RowSelector::skip(1),
1786+
RowSelector::select(398),
1787+
RowSelector::skip(3),
1788+
RowSelector::select(98),
1789+
RowSelector::skip(1),
1790+
RowSelector::select(99),
1791+
RowSelector::skip(3),
1792+
RowSelector::select(796),
1793+
RowSelector::skip(2),
1794+
RowSelector::select(499),
1795+
]);
1796+
1797+
assert_eq!(result, expected);
1798+
}
1799+
1800+
fn build_test_row_group_meta(
1801+
schema_descr: SchemaDescPtr,
1802+
columns: Vec<ColumnChunkMetaData>,
1803+
num_rows: i64,
1804+
ordinal: i16,
1805+
) -> RowGroupMetaData {
1806+
RowGroupMetaData::builder(schema_descr.clone())
1807+
.set_num_rows(num_rows)
1808+
.set_total_byte_size(2000)
1809+
.set_column_metadata(columns)
1810+
.set_ordinal(ordinal)
1811+
.build()
1812+
.unwrap()
1813+
}
1814+
1815+
fn get_test_schema_descr() -> SchemaDescPtr {
1816+
use parquet::schema::types::Type as SchemaType;
1817+
1818+
let schema = SchemaType::group_type_builder("schema")
1819+
.with_fields(vec![
1820+
Arc::new(
1821+
SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32)
1822+
.build()
1823+
.unwrap(),
1824+
),
1825+
Arc::new(
1826+
SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32)
1827+
.build()
1828+
.unwrap(),
1829+
),
1830+
])
1831+
.build()
1832+
.unwrap();
1833+
1834+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1835+
}
15961836
}

0 commit comments

Comments
 (0)