From 477a6ba9c6c1596d667fd10ab663b8d7f8fdcb9b Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Sat, 8 Feb 2025 12:35:36 +0000 Subject: [PATCH 1/3] feat: implementation for ArrowReader::build_deletes_row_selection --- crates/iceberg/src/arrow/reader.rs | 250 ++++++++++++++++++++++++++++- 1 file changed, 245 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5fe8bd292..7c8571e6d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -33,7 +33,9 @@ use bytes::Bytes; use fnv::FnvHashSet; use futures::future::BoxFuture; use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, +}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; @@ -342,15 +344,104 @@ impl ArrowReader { /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated /// as having been deleted by a positional delete, taking into account any row groups that have /// been skipped entirely by the filter predicate - #[allow(unused)] fn build_deletes_row_selection( row_group_metadata: &[RowGroupMetaData], selected_row_groups: &Option>, mut positional_deletes: DeleteVector, ) -> Result { - // TODO + let mut results: Vec = Vec::new(); + let mut selected_row_groups_idx = 0; + let mut current_page_base_idx: u64 = 0; + + for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() { + let page_num_rows = row_group_metadata.num_rows() as u64; + let next_page_base_idx = current_page_base_idx + page_num_rows; + + // if row group selection is enabled, + if let Some(selected_row_groups) = selected_row_groups { + // if we've consumed all the selected row groups, we're done + if selected_row_groups_idx == selected_row_groups.len() { + break; + } + + if idx == selected_row_groups[selected_row_groups_idx] { + // we're in a selected row group. Increment selected_row_groups_idx + // so that next time around the for loop we're looking for the next + // selected row group + selected_row_groups_idx += 1; + } else { + // remove any positional deletes from the skipped page so that + // `positional.deletes.min()` can be used + positional_deletes.remove_range(current_page_base_idx..next_page_base_idx); + + // still increment the current page base index but then skip to the next row group + // in the file + current_page_base_idx += page_num_rows; + continue; + } + } + + let mut next_deleted_row_idx = match positional_deletes.min() { + Some(next_deleted_row_idx) => { + // if the index of the next deleted row is beyond this page, add a selection for + // the remainder of this page and skip to the next page + if next_deleted_row_idx >= next_page_base_idx { + results.push(RowSelector::select(page_num_rows as usize)); + continue; + } + + next_deleted_row_idx + } + + // If there are no more pos deletes, add a selector for the entirety of this page. + _ => { + results.push(RowSelector::select(page_num_rows as usize)); + continue; + } + }; + + let mut current_idx = current_page_base_idx; + 'chunks: while next_deleted_row_idx < next_page_base_idx { + // `select` all rows that precede the next delete index + if current_idx < next_deleted_row_idx { + let run_length = next_deleted_row_idx - current_idx; + results.push(RowSelector::select(run_length as usize)); + current_idx += run_length; + } + + // `skip` all consecutive deleted rows in the current row group + let mut run_length = 0; + while next_deleted_row_idx == current_idx + && next_deleted_row_idx < next_page_base_idx + { + run_length += 1; + current_idx += 1; + positional_deletes.remove(next_deleted_row_idx); + + next_deleted_row_idx = match positional_deletes.min() { + Some(next_deleted_row_idx) => next_deleted_row_idx, + _ => { + // We've processed the final positional delete. + // Conclude the skip and then break so that we select the remaining + // rows in the page and move on to the next row group + results.push(RowSelector::skip(run_length)); + break 'chunks; + } + }; + } + results.push(RowSelector::skip(run_length)); + } + + if current_idx < next_page_base_idx { + results.push(RowSelector::select( + (next_page_base_idx - current_idx) as usize, + )); + } + + current_page_base_idx += page_num_rows; + } - Ok(RowSelection::default()) + Ok(results.into()) } fn build_field_id_set_and_map( @@ -1287,9 +1378,12 @@ mod tests { use parquet::arrow::{ArrowWriter, ProjectionMask}; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use parquet::schema::parser::parse_message_type; - use parquet::schema::types::SchemaDescriptor; use tempfile::TempDir; + use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor}; + use roaring::RoaringTreemap; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; @@ -1593,4 +1687,150 @@ message schema { (file_io, schema, table_location, tmp_dir) } + + #[test] + fn test_build_deletes_row_selection() { + let schema_descr = get_test_schema_descr(); + + let mut columns = vec![]; + for ptr in schema_descr.columns() { + let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap(); + columns.push(column); + } + + let row_groups_metadata = vec![ + build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 0), + build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 1), + build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 2), + build_test_row_group_meta(schema_descr.clone(), columns.clone(), 1000, 3), + build_test_row_group_meta(schema_descr.clone(), columns.clone(), 500, 4), + ]; + + let selected_row_groups = Some(vec![1, 3]); + + /* cases to cover: + * {skip|select} {first|intermediate|last} {one row|multiple rows} in + {first|imtermediate|last} {skipped|selected} row group + * row group selection disabled + */ + + let positional_deletes = RoaringTreemap::from_iter(&[ + 1, // in skipped rg 0, should be ignored + 3, // run of three consecutive items in skipped rg0 + 4, 5, 998, // two consecutive items at end of skipped rg0 + 999, 1000, // solitary row at start of selected rg1 (1, 9) + 1010, // run of 3 rows in selected rg1 + 1011, 1012, // (3, 485) + 1498, // run of two items at end of selected rg1 + 1499, 1500, // run of two items at start of skipped rg2 + 1501, 1600, // should ignore, in skipped rg2 + 1999, // single row at end of skipped rg2 + 2000, // run of two items at start of selected rg3 + 2001, // (4, 98) + 2100, // single row in selected row group 3 (1, 99) + 2200, // run of 3 consecutive rows in selected row group 3 + 2201, 2202, // (3, 796) + 2999, // single item at end of selected rg3 (1) + 3000, // single item at start of skipped rg4 + ]); + + let positional_deletes = DeleteVector { + inner: positional_deletes + }; + + // using selected row groups 1 and 3 + let result = ArrowReader::build_deletes_row_selection( + &row_groups_metadata, + &selected_row_groups, + positional_deletes.clone(), + ) + .unwrap(); + + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(9), + RowSelector::skip(3), + RowSelector::select(485), + RowSelector::skip(4), + RowSelector::select(98), + RowSelector::skip(1), + RowSelector::select(99), + RowSelector::skip(3), + RowSelector::select(796), + RowSelector::skip(1), + ]); + + assert_eq!(result, expected); + + // selecting all row groups + let result = ArrowReader::build_deletes_row_selection( + &row_groups_metadata, + &None, + positional_deletes, + ) + .unwrap(); + + let expected = RowSelection::from(vec![ + RowSelector::select(1), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(3), + RowSelector::select(992), + RowSelector::skip(3), + RowSelector::select(9), + RowSelector::skip(3), + RowSelector::select(485), + RowSelector::skip(4), + RowSelector::select(98), + RowSelector::skip(1), + RowSelector::select(398), + RowSelector::skip(3), + RowSelector::select(98), + RowSelector::skip(1), + RowSelector::select(99), + RowSelector::skip(3), + RowSelector::select(796), + RowSelector::skip(2), + RowSelector::select(499), + ]); + + assert_eq!(result, expected); + } + + fn build_test_row_group_meta( + schema_descr: SchemaDescPtr, + columns: Vec, + num_rows: i64, + ordinal: i16, + ) -> RowGroupMetaData { + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_total_byte_size(2000) + .set_column_metadata(columns) + .set_ordinal(ordinal) + .build() + .unwrap() + } + + fn get_test_schema_descr() -> SchemaDescPtr { + use parquet::schema::types::Type as SchemaType; + + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![ + Arc::new( + SchemaType::primitive_type_builder("a", parquet::basic::Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + SchemaType::primitive_type_builder("b", parquet::basic::Type::INT32) + .build() + .unwrap(), + ), + ]) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } } From 9a3fa7cc8b76eb30a69428ea9a31185be6d3ffdc Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 27 Mar 2025 22:48:22 +0000 Subject: [PATCH 2/3] refactor: delete row selection uses roaring treemap iterator and advance_to --- Cargo.lock | 3 +- Cargo.toml | 2 +- .../iceberg/src/arrow/delete_file_manager.rs | 4 +- crates/iceberg/src/arrow/reader.rs | 70 ++++++------- crates/iceberg/src/delete_vector.rs | 99 +++++++++++-------- 5 files changed, 98 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d013b03d..4b8b9edb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4998,8 +4998,7 @@ checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" [[package]] name = "roaring" version = "0.10.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a652edd001c53df0b3f96a36a8dc93fce6866988efc16808235653c6bcac8bf2" +source = "git+https://github.com/RoaringBitmap/roaring-rs.git#6cfeb8830f181a1f0a9fd9c3423175db6462fb2d" dependencies = [ "bytemuck", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index 730d156cf..777e788e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ port_scanner = "0.1.5" rand = "0.8.5" regex = "1.10.5" reqwest = { version = "0.12.2", default-features = false, features = ["json"] } -roaring = "0.10" +roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" } rust_decimal = "1.31" serde = { version = "1.0.204", features = ["rc"] } serde_bytes = "0.11.15" diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 0d46f4768..e1ca47679 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::delete_vector::DeleteVector; use crate::expr::BoundPredicate; use crate::io::FileIO; @@ -85,7 +87,7 @@ impl CachingDeleteFileManager { pub(crate) fn get_positional_delete_indexes_for_data_file( &self, data_file_path: &str, - ) -> Option { + ) -> Option> { // TODO None diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 7c8571e6d..195fd7fb4 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -282,7 +282,7 @@ impl ArrowReader { let delete_row_selection = Self::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, - positional_delete_indexes, + positional_delete_indexes.as_ref(), )?; // merge the row selection from the delete files with the row selection @@ -345,17 +345,19 @@ impl ArrowReader { /// as having been deleted by a positional delete, taking into account any row groups that have /// been skipped entirely by the filter predicate fn build_deletes_row_selection( - row_group_metadata: &[RowGroupMetaData], + row_group_metadata_list: &[RowGroupMetaData], selected_row_groups: &Option>, - mut positional_deletes: DeleteVector, + positional_deletes: &DeleteVector, ) -> Result { let mut results: Vec = Vec::new(); let mut selected_row_groups_idx = 0; - let mut current_page_base_idx: u64 = 0; + let mut current_row_group_base_idx: u64 = 0; + let mut delete_vector_iter = positional_deletes.iter(); + let mut next_deleted_row_idx_opt = delete_vector_iter.next(); - for (idx, row_group_metadata) in row_group_metadata.iter().enumerate() { - let page_num_rows = row_group_metadata.num_rows() as u64; - let next_page_base_idx = current_page_base_idx + page_num_rows; + for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() { + let row_group_num_rows = row_group_metadata.num_rows() as u64; + let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows; // if row group selection is enabled, if let Some(selected_row_groups) = selected_row_groups { @@ -372,36 +374,37 @@ impl ArrowReader { } else { // remove any positional deletes from the skipped page so that // `positional.deletes.min()` can be used - positional_deletes.remove_range(current_page_base_idx..next_page_base_idx); + delete_vector_iter.advance_to(next_row_group_base_idx); + next_deleted_row_idx_opt = delete_vector_iter.next(); // still increment the current page base index but then skip to the next row group // in the file - current_page_base_idx += page_num_rows; + current_row_group_base_idx += row_group_num_rows; continue; } } - let mut next_deleted_row_idx = match positional_deletes.min() { + let mut next_deleted_row_idx = match next_deleted_row_idx_opt { Some(next_deleted_row_idx) => { - // if the index of the next deleted row is beyond this page, add a selection for - // the remainder of this page and skip to the next page - if next_deleted_row_idx >= next_page_base_idx { - results.push(RowSelector::select(page_num_rows as usize)); + // if the index of the next deleted row is beyond this row group, add a selection for + // the remainder of this row group and skip to the next row group + if next_deleted_row_idx >= next_row_group_base_idx { + results.push(RowSelector::select(row_group_num_rows as usize)); continue; } next_deleted_row_idx } - // If there are no more pos deletes, add a selector for the entirety of this page. + // If there are no more pos deletes, add a selector for the entirety of this row group. _ => { - results.push(RowSelector::select(page_num_rows as usize)); + results.push(RowSelector::select(row_group_num_rows as usize)); continue; } }; - let mut current_idx = current_page_base_idx; - 'chunks: while next_deleted_row_idx < next_page_base_idx { + let mut current_idx = current_row_group_base_idx; + 'chunks: while next_deleted_row_idx < next_row_group_base_idx { // `select` all rows that precede the next delete index if current_idx < next_deleted_row_idx { let run_length = next_deleted_row_idx - current_idx; @@ -412,18 +415,18 @@ impl ArrowReader { // `skip` all consecutive deleted rows in the current row group let mut run_length = 0; while next_deleted_row_idx == current_idx - && next_deleted_row_idx < next_page_base_idx + && next_deleted_row_idx < next_row_group_base_idx { run_length += 1; current_idx += 1; - positional_deletes.remove(next_deleted_row_idx); - next_deleted_row_idx = match positional_deletes.min() { + next_deleted_row_idx_opt = delete_vector_iter.next(); + next_deleted_row_idx = match next_deleted_row_idx_opt { Some(next_deleted_row_idx) => next_deleted_row_idx, _ => { // We've processed the final positional delete. // Conclude the skip and then break so that we select the remaining - // rows in the page and move on to the next row group + // rows in the row group and move on to the next row group results.push(RowSelector::skip(run_length)); break 'chunks; } @@ -432,13 +435,13 @@ impl ArrowReader { results.push(RowSelector::skip(run_length)); } - if current_idx < next_page_base_idx { + if current_idx < next_row_group_base_idx { results.push(RowSelector::select( - (next_page_base_idx - current_idx) as usize, + (next_row_group_base_idx - current_idx) as usize, )); } - current_page_base_idx += page_num_rows; + current_row_group_base_idx += row_group_num_rows; } Ok(results.into()) @@ -1375,18 +1378,19 @@ mod tests { use arrow_array::{ArrayRef, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use futures::TryStreamExt; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::{ArrowWriter, ProjectionMask}; use parquet::basic::Compression; - use parquet::file::properties::WriterProperties; - use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use parquet::file::properties::WriterProperties; use parquet::schema::parser::parse_message_type; - use tempfile::TempDir; use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor}; use roaring::RoaringTreemap; + use tempfile::TempDir; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; use crate::arrow::{ArrowReader, ArrowReaderBuilder}; + use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; @@ -1733,16 +1737,14 @@ message schema { 2999, // single item at end of selected rg3 (1) 3000, // single item at start of skipped rg4 ]); - - let positional_deletes = DeleteVector { - inner: positional_deletes - }; + + let positional_deletes = DeleteVector::new(positional_deletes); // using selected row groups 1 and 3 let result = ArrowReader::build_deletes_row_selection( &row_groups_metadata, &selected_row_groups, - positional_deletes.clone(), + &positional_deletes, ) .unwrap(); @@ -1766,7 +1768,7 @@ message schema { let result = ArrowReader::build_deletes_row_selection( &row_groups_metadata, &None, - positional_deletes, + &positional_deletes, ) .unwrap(); diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index d57f7e4be..6337e312c 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use roaring::bitmap::Iter; +use roaring::treemap::BitmapIter; use roaring::RoaringTreemap; #[allow(unused)] @@ -23,68 +25,81 @@ pub struct DeleteVector { } impl DeleteVector { - pub fn iter(&self) -> DeleteVectorIterator { - let mut iter = self.inner.bitmaps(); - match iter.next() { - Some((high_bits, bitmap)) => { - DeleteVectorIterator { - inner: Some(DeleteVectorIteratorInner { - // iter, - high_bits: (high_bits as u64) << 32, - bitmap_iter: bitmap.iter(), - }), - } - } - _ => DeleteVectorIterator { inner: None }, + #[allow(unused)] + pub(crate) fn new(roaring_treemap: RoaringTreemap) -> DeleteVector { + DeleteVector { + inner: roaring_treemap, } } + + pub fn iter(&self) -> DeleteVectorIterator { + let outer = self.inner.bitmaps(); + DeleteVectorIterator { outer, inner: None } + } } +// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here. +// But right now, it does not have a corresponding implementation of `roaring::bitmap::Iter::advance_to`, +// which is very handy in ArrowReader::build_deletes_row_selection. +// There is a PR open on roaring to add this (https://github.com/RoaringBitmap/roaring-rs/pull/314) +// and if that gets merged then we can simplify `DeleteVectorIterator` here, refactoring `advance_to` +// to just a wrapper around the underlying iterator's method. pub struct DeleteVectorIterator<'a> { + // NB: `BitMapIter` was only exposed publicly in https://github.com/RoaringBitmap/roaring-rs/pull/316 + // which is not yet released. As a consequence our Cargo.toml temporarily uses a git reference for + // the roaring dependency. + outer: BitmapIter<'a>, inner: Option>, } struct DeleteVectorIteratorInner<'a> { - // TODO: roaring::treemap::iter::BitmapIter is currently private. - // See https://github.com/RoaringBitmap/roaring-rs/issues/312 - // iter: roaring::treemap::iter::BitmapIter<'a>, - high_bits: u64, - bitmap_iter: roaring::bitmap::Iter<'a>, + high_bits: u32, + bitmap_iter: Iter<'a>, } impl Iterator for DeleteVectorIterator<'_> { type Item = u64; fn next(&mut self) -> Option { - let Some(ref mut inner) = &mut self.inner else { - return None; - }; - - if let Some(lower) = inner.bitmap_iter.next() { - return Some(inner.high_bits & lower as u64); - }; - - // TODO: roaring::treemap::iter::BitmapIter is currently private. - // See https://github.com/RoaringBitmap/roaring-rs/issues/312 + if let Some(ref mut inner) = &mut self.inner { + if let Some(inner_next) = inner.bitmap_iter.next() { + return Some(u64::from(inner.high_bits) << 32 | u64::from(inner_next)); + } + } - // replace with commented-out code below once BitmapIter is pub, - // or use RoaringTreemap::iter if `advance_to` gets implemented natively - None + if let Some((high_bits, next_bitmap)) = self.outer.next() { + self.inner = Some(DeleteVectorIteratorInner { + high_bits, + bitmap_iter: next_bitmap.iter(), + }) + } else { + return None; + } - // let Some((high_bits, bitmap)) = inner.iter.next() else { - // self.inner = None; - // return None; - // }; - // - // inner.high_bits = (high_bits as u64) << 32; - // inner.bitmap_iter = bitmap.iter(); - // - // self.next() + self.next() } } impl<'a> DeleteVectorIterator<'a> { - pub fn advance_to(&'a mut self, _pos: u64) { - // TODO + pub fn advance_to(&mut self, pos: u64) { + let hi = (pos >> 32) as u32; + let lo = pos as u32; + + let Some(ref mut inner) = self.inner else { + return; + }; + + while inner.high_bits < hi { + let Some((next_hi, next_bitmap)) = self.outer.next() else { + return; + }; + + *inner = DeleteVectorIteratorInner { + high_bits: next_hi, + bitmap_iter: next_bitmap.iter(), + } + } + + inner.bitmap_iter.advance_to(lo); } } From caeb7206d6f404553ccf5ca3ab4e2c4d2269873a Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 27 Mar 2025 22:54:30 +0000 Subject: [PATCH 3/3] fix: added check to avoid adding empty entry to row selection --- crates/iceberg/src/arrow/reader.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 195fd7fb4..20b4f7ca1 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -432,7 +432,9 @@ impl ArrowReader { } }; } - results.push(RowSelector::skip(run_length)); + if run_length > 0 { + results.push(RowSelector::skip(run_length)); + } } if current_idx < next_row_group_base_idx {