From 9596e291d8ec055daac5c791d40d7076fe119905 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 11 Mar 2022 16:26:48 +0800 Subject: [PATCH 01/10] Filter parquet row groups by range as well --- Cargo.toml | 5 + ballista/rust/core/proto/ballista.proto | 6 + .../src/serde/physical_plan/from_proto.rs | 14 +- .../core/src/serde/physical_plan/to_proto.rs | 14 +- datafusion/src/datasource/listing/helpers.rs | 3 + datafusion/src/datasource/mod.rs | 27 ++- .../src/datasource/object_store/local.rs | 1 + .../src/physical_plan/file_format/parquet.rs | 178 ++++++++---------- datafusion/tests/parquet_pruning.rs | 12 +- 9 files changed, 151 insertions(+), 109 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f7e9c0330e5f..7e9541599893 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,8 @@ members = [ [profile.release] lto = true codegen-units = 1 + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "2bca71e322fcab6c6d93a47ef71638a617e29f6c"} +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "2bca71e322fcab6c6d93a47ef71638a617e29f6c"} +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "2bca71e322fcab6c6d93a47ef71638a617e29f6c"} diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index a835229a6057..2056c4b0fea5 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -68,11 +68,17 @@ message Statistics { bool is_exact = 4; } +message FileRange { + int64 start = 1; + int64 end = 2; +} + message PartitionedFile { string path = 1; uint64 size = 2; uint64 last_modified_ns = 3; repeated datafusion.ScalarValue partition_values = 4; + FileRange range = 5; } message CsvFormat { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 8daefc904b7c..8719738e30da 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -29,7 +29,7 @@ use chrono::{TimeZone, Utc}; use datafusion::catalog::catalog::{CatalogList, MemoryCatalogList}; use datafusion::datasource::object_store::local::LocalFileSystem; use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile}; -use datafusion::datasource::PartitionedFile; +use datafusion::datasource::{FileRange, PartitionedFile}; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -262,6 +262,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: val.range.as_ref().map(|v| v.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&protobuf::FileRange> for FileRange { + type Error = BallistaError; + + fn try_from(value: &protobuf::FileRange) -> Result { + Ok(FileRange { + start: value.start, + end: value.end, }) } } diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index c6cfb1ec84c9..08a94a6cad7d 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ Statistics, }; -use datafusion::datasource::PartitionedFile; +use datafusion::datasource::{FileRange, PartitionedFile}; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, Literal}; @@ -273,6 +273,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&FileRange> for protobuf::FileRange { + type Error = BallistaError; + + fn try_from(value: &FileRange) -> Result { + Ok(protobuf::FileRange { + start: value.start, + end: value.end, }) } } diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index 335d8275fcce..421a086bbd7a 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -173,6 +173,7 @@ pub async fn pruned_partition_list( Ok(PartitionedFile { partition_values: vec![], file_meta: f?, + range: None, }) }), )); @@ -216,6 +217,7 @@ pub async fn pruned_partition_list( Ok(PartitionedFile { partition_values, file_meta, + range: None, }) }) } @@ -353,6 +355,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Vec { ScalarValue::try_from_array(batch.column(col), row).unwrap() }) .collect(), + range: None, }) }) .collect() diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9a7b17d1a867..81bf204d712b 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -125,14 +125,24 @@ pub async fn get_statistics_with_limit( } #[derive(Debug, Clone)] -/// A single file that should be read, along with its schema, statistics +/// File part identified by [start, end) positions. +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) pub file_meta: FileMeta, /// Values of partition columns to be appended to each row pub partition_values: Vec, - // We may include row group range here for a more fine-grained parallel execution + /// An optional file range for a more fine-grained parallel execution + pub range: Option, } impl PartitionedFile { @@ -144,6 +154,19 @@ impl PartitionedFile { last_modified: None, }, partition_values: vec![], + range: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + file_meta: FileMeta { + sized_file: SizedFile { path, size }, + last_modified: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), } } } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index edfe5e2cecd6..973905077a9f 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -185,6 +185,7 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile { last_modified: metadata.modified().map(chrono::DateTime::from).ok(), }, partition_values: vec![], + range: None, } } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 2d23ca1c3ada..42c87f756e44 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -53,8 +53,7 @@ use arrow::{ use log::debug; use parquet::arrow::ArrowWriter; use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, + metadata::RowGroupMetaData, reader::SerializedFileReader, statistics::Statistics as ParquetStatistics, }; @@ -71,6 +70,7 @@ use tokio::{ use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::file_format::SchemaAdapter; use async_trait::async_trait; +use parquet::file::serialized_reader::ReadOptionsBuilder; use super::PartitionColumnProjector; @@ -310,7 +310,7 @@ fn send_result( /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { - row_group_metadata: &'a [RowGroupMetaData], + row_group_metadata: &'a RowGroupMetaData, parquet_schema: &'a Schema, } @@ -343,33 +343,25 @@ macro_rules! get_statistic { // Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate macro_rules! get_min_max_values { ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None - }; + let (column_index, field) = + if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { + (v, f) + } else { + // Named column was not present + return None; + }; let data_type = field.data_type(); // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type let null_scalar: ScalarValue = data_type.try_into().ok()?; - - let scalar_values : Vec = $self.row_group_metadata - .iter() - .flat_map(|meta| { - meta.column(column_index).statistics() - }) - .map(|stats| { - get_statistic!(stats, $func, $bytes_func) - }) - .map(|maybe_scalar| { - // column either did't have statistics at all or didn't have min/max values - maybe_scalar.unwrap_or_else(|| null_scalar.clone()) - }) - .collect(); - - // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + $self.row_group_metadata + .column(column_index) + .statistics() + .map(|stats| get_statistic!(stats, $func, $bytes_func)) + .flatten() + // column either didn't have statistics at all or didn't have min/max values + .or_else(|| Some(null_scalar.clone())) + .map(|s| s.to_array()) }} } @@ -384,17 +376,14 @@ macro_rules! get_null_count_values { return None; }; - let scalar_values: Vec = $self - .row_group_metadata - .iter() - .flat_map(|meta| meta.column(column_index).statistics()) - .map(|stats| { - ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) - }) - .collect(); - - // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + let value = ScalarValue::UInt64( + $self + .row_group_metadata + .column(column_index) + .statistics() + .map(|s| s.null_count()), + ); + Some(value.to_array()) }}; } @@ -408,7 +397,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn num_containers(&self) -> usize { - self.row_group_metadata.len() + 1 } fn null_counts(&self, column: &Column) -> Option { @@ -419,31 +408,33 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn build_row_group_predicate( pruning_predicate: &PruningPredicate, metrics: ParquetFileMetrics, - row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { - let parquet_schema = pruning_predicate.schema().as_ref(); - - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, - }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - - match predicate_values { - Ok(values) => { - // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) - } - } +) -> Box bool> { + let pruning_predicate = pruning_predicate.clone(); + Box::new( + move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { + let parquet_schema = pruning_predicate.schema().as_ref(); + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata, + parquet_schema, + }; + let predicate_values = pruning_predicate.prune(&pruning_stats); + match predicate_values { + Ok(values) => { + // NB: false means don't scan row group + let num_pruned = values.iter().filter(|&v| !*v).count(); + metrics.row_groups_pruned.add(num_pruned); + values[0] + } + // stats filter array could not be built + // return a closure which will not filter out any row groups + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + true + } + } + }, + ) } #[allow(clippy::too_many_arguments)] @@ -471,17 +462,23 @@ fn read_partition( ); let object_reader = object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - let mut file_reader = - SerializedFileReader::new(ChunkObjectReader(object_reader))?; + + let mut opt = ReadOptionsBuilder::new(); if let Some(pruning_predicate) = pruning_predicate { - let row_group_predicate = build_row_group_predicate( + opt = opt.with_predicate(build_row_group_predicate( pruning_predicate, file_metrics, - file_reader.metadata().row_groups(), - ); - file_reader.filter_row_groups(&row_group_predicate); + )); + } + if let Some(range) = &partitioned_file.range { + opt = opt.with_range(range.start, range.end); } + let file_reader = SerializedFileReader::new_with_options( + ChunkObjectReader(object_reader), + opt.build(), + )?; + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let adapted_projections = schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?; @@ -999,6 +996,7 @@ mod tests { last_modified: None, }, partition_values: vec![], + range: None, }; let parquet_exec = ParquetExec::new( @@ -1051,11 +1049,8 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1084,11 +1079,8 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1132,11 +1124,8 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1150,11 +1139,8 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1199,11 +1185,8 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1231,11 +1214,8 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 9869a1f6b16a..213bea6a8321 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -262,7 +262,7 @@ async fn prune_int32_scalar_fun() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 3, "{}", output.description()); } @@ -278,7 +278,7 @@ async fn prune_int32_complex_expr() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 2, "{}", output.description()); } @@ -294,7 +294,7 @@ async fn prune_int32_complex_expr_subtract() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); } @@ -366,7 +366,7 @@ async fn prune_f64_scalar_fun() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 1, "{}", output.description()); } @@ -382,7 +382,7 @@ async fn prune_f64_complex_expr() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); } @@ -398,7 +398,7 @@ async fn prune_f64_complex_expr_subtract() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); } From e6a9cdd2ba0ddc4a785261a186ae66e5bf970d09 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 23 Mar 2022 11:51:25 +0800 Subject: [PATCH 02/10] fix --- ballista/rust/core/src/serde/physical_plan/from_proto.rs | 2 +- datafusion/src/dataframe.rs | 6 +++--- datafusion/src/physical_plan/file_format/parquet.rs | 3 --- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d7f8c101bc10..78f760309698 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -29,7 +29,7 @@ use chrono::{TimeZone, Utc}; use datafusion::datasource::object_store::local::LocalFileSystem; use datafusion::datasource::object_store::{FileMeta, SizedFile}; use datafusion::datasource::{FileRange, PartitionedFile}; -use datafusion::execution::context::{ExecutionProps, SessionState}; +use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::file_format::FileScanConfig; diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index ae2cc765194a..c329aff6ef2d 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -351,7 +351,7 @@ impl DataFrame { pub async fn collect(&self) -> Result> { let plan = self.create_physical_plan().await?; let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); - Ok(collect(plan, task_ctx).await?) + collect(plan, task_ctx).await } /// Print results. @@ -426,7 +426,7 @@ impl DataFrame { pub async fn collect_partitioned(&self) -> Result>> { let plan = self.create_physical_plan().await?; let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); - Ok(collect_partitioned(plan, task_ctx).await?) + collect_partitioned(plan, task_ctx).await } /// Executes this DataFrame and returns one stream per partition. @@ -447,7 +447,7 @@ impl DataFrame { ) -> Result> { let plan = self.create_physical_plan().await?; let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); - Ok(execute_stream_partitioned(plan, task_ctx).await?) + execute_stream_partitioned(plan, task_ctx).await } /// Returns the schema describing the output of this DataFrame in terms of columns returned, diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 54ad4885ee39..b1ac1b7df896 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -352,10 +352,7 @@ macro_rules! get_min_max_values { let data_type = field.data_type(); // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type let null_scalar: ScalarValue = data_type.try_into().ok()?; -<<<<<<< HEAD -======= ->>>>>>> apache/master $self.row_group_metadata .column(column_index) .statistics() From 6a61735ab385f000421707381aeee8fdbac7f653 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 23 Mar 2022 17:46:57 +0800 Subject: [PATCH 03/10] WIP: case when expr works --- .../src/expressions/case.rs | 89 +++++++++++++++++-- datafusion-physical-expr/src/physical_expr.rs | 77 +++++++++++++++- 2 files changed, 160 insertions(+), 6 deletions(-) diff --git a/datafusion-physical-expr/src/expressions/case.rs b/datafusion-physical-expr/src/expressions/case.rs index 3bcb78a97745..e72a8151280a 100644 --- a/datafusion-physical-expr/src/expressions/case.rs +++ b/datafusion-physical-expr/src/expressions/case.rs @@ -341,12 +341,14 @@ impl CaseExpr { let when_value = self.when_then_expr[i].0.evaluate(batch)?; let when_value = when_value.into_array(batch.num_rows()); - let then_value = self.when_then_expr[i].1.evaluate(batch)?; - let then_value = then_value.into_array(batch.num_rows()); - // build boolean array representing which rows match the "when" value let when_match = array_equals(&base_type, when_value, base_value.clone())?; + let then_value = self.when_then_expr[i] + .1 + .evaluate_selection(batch, &when_match)?; + let then_value = then_value.into_array(batch.num_rows()); + current_value = Some(if_then_else( &when_match, then_value, @@ -389,7 +391,9 @@ impl CaseExpr { .downcast_ref::() .expect("WHEN expression did not return a BooleanArray"); - let then_value = self.when_then_expr[i].1.evaluate(batch)?; + let then_value = self.when_then_expr[i] + .1 + .evaluate_selection(batch, when_value)?; let then_value = then_value.into_array(batch.num_rows()); current_value = Some(if_then_else( @@ -455,10 +459,11 @@ pub fn case( #[cfg(test)] mod tests { use super::*; - use crate::expressions::binary; use crate::expressions::col; use crate::expressions::lit; + use crate::expressions::{binary, cast}; use arrow::array::StringArray; + use arrow::datatypes::DataType::Float64; use arrow::datatypes::*; use datafusion_common::ScalarValue; use datafusion_expr::Operator; @@ -523,6 +528,39 @@ mod tests { Ok(()) } + #[test] + fn case_with_expr_divide_by_zero() -> Result<()> { + let batch = case_test_batch1()?; + let schema = batch.schema(); + + // CASE a when 0 THEN float64(null) ELSE 25.0 / cast(a, float64) END + let when1 = lit(ScalarValue::Int32(Some(0))); + let then1 = lit(ScalarValue::Float64(None)); + let else_value = binary( + lit(ScalarValue::Float64(Some(25.0))), + Operator::Divide, + cast(col("a", &schema)?, &batch.schema(), Float64)?, + &batch.schema(), + )?; + + let expr = case( + Some(col("a", &schema)?), + &[(when1, then1)], + Some(else_value), + )?; + let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = result + .as_any() + .downcast_ref::() + .expect("failed to downcast to Int32Array"); + + let expected = &Float64Array::from(vec![Some(25.0), None, None, Some(5.0)]); + + assert_eq!(expected, result); + + Ok(()) + } + #[test] fn case_without_expr() -> Result<()> { let batch = case_test_batch()?; @@ -558,6 +596,47 @@ mod tests { Ok(()) } + #[test] + fn case_without_expr_divide_by_zero() -> Result<()> { + let batch = case_test_batch1()?; + let schema = batch.schema(); + + // CASE WHEN a > 0 THEN 25.0 / cast(a, float64) ELSE float64(null) END + let when1 = binary( + col("a", &schema)?, + Operator::Gt, + lit(ScalarValue::Int32(Some(0))), + &batch.schema(), + )?; + let then1 = binary( + lit(ScalarValue::Float64(Some(25.0))), + Operator::Divide, + cast(col("a", &schema)?, &batch.schema(), Float64)?, + &batch.schema(), + )?; + let x = lit(ScalarValue::Float64(None)); + + let expr = case(None, &[(when1, then1)], Some(x))?; + let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); + let result = result + .as_any() + .downcast_ref::() + .expect("failed to downcast to Int32Array"); + + let expected = &Float64Array::from(vec![Some(25.0), None, None, Some(5.0)]); + + assert_eq!(expected, result); + + Ok(()) + } + + fn case_test_batch1() -> Result { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let a = Int32Array::from(vec![Some(1), Some(0), None, Some(5)]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?; + Ok(batch) + } + #[test] fn case_without_expr_else() -> Result<()> { let batch = case_test_batch()?; diff --git a/datafusion-physical-expr/src/physical_expr.rs b/datafusion-physical-expr/src/physical_expr.rs index 25885b1ab567..3de46d70a144 100644 --- a/datafusion-physical-expr/src/physical_expr.rs +++ b/datafusion-physical-expr/src/physical_expr.rs @@ -19,12 +19,17 @@ use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; use std::fmt::{Debug, Display}; +use arrow::array::{ + make_array, Array, ArrayRef, BooleanArray, MutableArrayData, UInt64Array, +}; +use arrow::compute::{take, SlicesIterator}; use std::any::Any; +use std::sync::Arc; /// Expression that can be evaluated against a RecordBatch /// A Physical expression knows its type, nullability and how to evaluate itself. @@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug { fn nullable(&self, input_schema: &Schema) -> Result; /// Evaluate an expression against a RecordBatch fn evaluate(&self, batch: &RecordBatch) -> Result; + /// Evaluate an expression against a RecordBatch with validity array + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + let mut indices = vec![]; + for (i, b) in selection.iter().enumerate() { + if let Some(true) = b { + indices.push(i as u64); + } + } + let indices = UInt64Array::from_iter_values(indices); + let tmp_columns = batch + .columns() + .iter() + .map(|c| { + take(c.as_ref(), &indices, None) + .map_err(|e| DataFusionError::Execution(e.to_string())) + }) + .collect::>>>()?; + + let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?; + let tmp_result = self.evaluate(&tmp_batch)?; + if let ColumnarValue::Array(a) = tmp_result { + let result = scatter(selection, a.as_ref())?; + Ok(ColumnarValue::Array(result)) + } else { + Ok(tmp_result) + } + } +} + +/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`, next values of `truthy` +/// are taken, when the mask evaluates `false` values null values are filled. +/// +/// # Arguments +/// * `mask` - Boolean values used to determine where to put the `truthy` values +/// * `truthy` - All values of this array are to scatter according to `mask` into final result. +fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { + let truthy = truthy.data(); + + let mut mutable = MutableArrayData::new(vec![&*truthy], true, mask.len()); + + // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to + // fill with falsy values + + // keep track of how much is filled + let mut filled = 0; + // keep track of current position we have in truthy array + let mut true_pos = 0; + + SlicesIterator::new(mask).for_each(|(start, end)| { + // the gap needs to be filled with nulls + if start > filled { + mutable.extend_nulls(start - filled); + } + // fill with truthy values + let len = end - start; + mutable.extend(0, true_pos, true_pos + len); + true_pos += len; + filled = end; + }); + // the remaining part is falsy + if filled < truthy.len() { + mutable.extend_nulls(truthy.len() - filled); + } + + let data = mutable.freeze(); + Ok(make_array(data)) } From bc78f970a13bb70de2011d84ceb414b8ecec8fb0 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 24 Mar 2022 11:15:43 +0800 Subject: [PATCH 04/10] short-circuit case_when --- .../src/expressions/case.rs | 97 +++++++++++-------- 1 file changed, 55 insertions(+), 42 deletions(-) diff --git a/datafusion-physical-expr/src/expressions/case.rs b/datafusion-physical-expr/src/expressions/case.rs index e72a8151280a..b1eb241fcdae 100644 --- a/datafusion-physical-expr/src/expressions/case.rs +++ b/datafusion-physical-expr/src/expressions/case.rs @@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc}; use crate::expressions::try_cast; use crate::PhysicalExpr; use arrow::array::{self, *}; -use arrow::compute::{eq, eq_utf8}; +use arrow::compute::{and, eq, eq_utf8, is_null, not, or, or_kleene}; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; @@ -323,24 +323,19 @@ impl CaseExpr { let base_value = expr.evaluate(batch)?; let base_type = expr.data_type(&batch.schema())?; let base_value = base_value.into_array(batch.num_rows()); + let base_nulls = is_null(base_value.as_ref())?; - // start with the else condition, or nulls - let mut current_value: Option = if let Some(e) = &self.else_expr { - // keep `else_expr`'s data type and return type consistent - let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone()) - .unwrap_or_else(|_| e.clone()); - Some(expr.evaluate(batch)?.into_array(batch.num_rows())) - } else { - Some(new_null_array(&return_type, batch.num_rows())) - }; - - // walk backwards through the when/then expressions - for i in (0..self.when_then_expr.len()).rev() { + // start with nulls as default output + let mut current_value = new_null_array(&return_type, batch.num_rows()); + // We only consider non-null values while comparing with whens + let mut remainder = not(&base_nulls)?; + for i in 0..self.when_then_expr.len() { let i = i as usize; - let when_value = self.when_then_expr[i].0.evaluate(batch)?; + let when_value = self.when_then_expr[i] + .0 + .evaluate_selection(batch, &remainder)?; let when_value = when_value.into_array(batch.num_rows()); - // build boolean array representing which rows match the "when" value let when_match = array_equals(&base_type, when_value, base_value.clone())?; @@ -349,15 +344,24 @@ impl CaseExpr { .evaluate_selection(batch, &when_match)?; let then_value = then_value.into_array(batch.num_rows()); - current_value = Some(if_then_else( - &when_match, - then_value, - current_value.unwrap(), - &return_type, - )?); + current_value = + if_then_else(&when_match, then_value, current_value, &return_type)?; + + remainder = and(&remainder, &or_kleene(¬(&when_match)?, &base_nulls)?)?; + } + + if let Some(e) = &self.else_expr { + // keep `else_expr`'s data type and return type consistent + let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone()) + .unwrap_or_else(|_| e.clone()); + let else_ = expr + .evaluate_selection(batch, &remainder)? + .into_array(batch.num_rows()); + remainder = or(&base_nulls, &remainder)?; + current_value = if_then_else(&remainder, else_, current_value, &return_type)?; } - Ok(ColumnarValue::Array(current_value.unwrap())) + Ok(ColumnarValue::Array(current_value)) } /// This function evaluates the form of CASE where each WHEN expression is a boolean @@ -370,20 +374,15 @@ impl CaseExpr { fn case_when_no_expr(&self, batch: &RecordBatch) -> Result { let return_type = self.when_then_expr[0].1.data_type(&batch.schema())?; - // start with the else condition, or nulls - let mut current_value: Option = if let Some(e) = &self.else_expr { - let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone()) - .unwrap_or_else(|_| e.clone()); - Some(expr.evaluate(batch)?.into_array(batch.num_rows())) - } else { - Some(new_null_array(&return_type, batch.num_rows())) - }; - - // walk backwards through the when/then expressions - for i in (0..self.when_then_expr.len()).rev() { + // start with nulls as default output + let mut current_value = new_null_array(&return_type, batch.num_rows()); + let mut remainder = BooleanArray::from(vec![true; batch.num_rows()]); + for i in 0..self.when_then_expr.len() { let i = i as usize; - let when_value = self.when_then_expr[i].0.evaluate(batch)?; + let when_value = self.when_then_expr[i] + .0 + .evaluate_selection(batch, &remainder)?; let when_value = when_value.into_array(batch.num_rows()); let when_value = when_value .as_ref() @@ -396,20 +395,34 @@ impl CaseExpr { .evaluate_selection(batch, when_value)?; let then_value = then_value.into_array(batch.num_rows()); - current_value = Some(if_then_else( - when_value, - then_value, - current_value.unwrap(), - &return_type, - )?); + current_value = + if_then_else(when_value, then_value, current_value, &return_type)?; + + // Succeed tuples should be filtered out for short-circuit evaluation, + // null values for the current when expr should be kept + remainder = and( + &remainder, + &or_kleene(¬(when_value)?, &is_null(when_value)?)?, + )?; + } + + if let Some(e) = &self.else_expr { + // keep `else_expr`'s data type and return type consistent + let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone()) + .unwrap_or_else(|_| e.clone()); + let else_ = expr + .evaluate_selection(batch, &remainder)? + .into_array(batch.num_rows()); + remainder = or(&is_null(&remainder)?, &remainder)?; + current_value = if_then_else(&remainder, else_, current_value, &return_type)?; } - Ok(ColumnarValue::Array(current_value.unwrap())) + Ok(ColumnarValue::Array(current_value)) } } impl PhysicalExpr for CaseExpr { - /// Return a reference to Any that can be used for downcasting + /// Return a reference to Any that can be used for down-casting fn as_any(&self) -> &dyn Any { self } From 602034c8518778fe4b9ee0e3dc35543c9d906655 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 24 Mar 2022 12:13:37 +0800 Subject: [PATCH 05/10] else --- datafusion-physical-expr/src/expressions/case.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-physical-expr/src/expressions/case.rs b/datafusion-physical-expr/src/expressions/case.rs index b1eb241fcdae..93f39c2870df 100644 --- a/datafusion-physical-expr/src/expressions/case.rs +++ b/datafusion-physical-expr/src/expressions/case.rs @@ -354,10 +354,11 @@ impl CaseExpr { // keep `else_expr`'s data type and return type consistent let expr = try_cast(e.clone(), &*batch.schema(), return_type.clone()) .unwrap_or_else(|_| e.clone()); + // null and unmatched tuples should be assigned else value + remainder = or(&base_nulls, &remainder)?; let else_ = expr .evaluate_selection(batch, &remainder)? .into_array(batch.num_rows()); - remainder = or(&base_nulls, &remainder)?; current_value = if_then_else(&remainder, else_, current_value, &return_type)?; } @@ -413,7 +414,6 @@ impl CaseExpr { let else_ = expr .evaluate_selection(batch, &remainder)? .into_array(batch.num_rows()); - remainder = or(&is_null(&remainder)?, &remainder)?; current_value = if_then_else(&remainder, else_, current_value, &return_type)?; } From bba90fc81563018c0050477e2631a8333c159404 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 7 Apr 2022 16:28:08 +0800 Subject: [PATCH 06/10] only range part --- .../src/serde/physical_plan/from_proto.rs | 14 +++++++++- .../core/src/serde/physical_plan/to_proto.rs | 14 +++++++++- datafusion/core/src/datasource/listing/mod.rs | 27 ++++++++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index cc7f866e91a0..7e759bd606eb 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -30,7 +30,7 @@ use chrono::{TimeZone, Utc}; use datafusion::datafusion_data_access::{ object_store::local::LocalFileSystem, FileMeta, SizedFile, }; -use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::execution::context::ExecutionProps; use datafusion::logical_plan::FunctionRegistry; @@ -301,6 +301,18 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: val.range.as_ref().map(|v| v.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&protobuf::FileRange> for FileRange { + type Error = BallistaError; + + fn try_from(value: &protobuf::FileRange) -> Result { + Ok(FileRange { + start: value.start, + end: value.end, }) } } diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 1a1276ec17ed..55332211db79 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ Statistics, }; -use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, Literal}; @@ -354,6 +354,18 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .iter() .map(|v| v.try_into()) .collect::, _>>()?, + range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, + }) + } +} + +impl TryFrom<&FileRange> for protobuf::FileRange { + type Error = BallistaError; + + fn try_from(value: &FileRange) -> Result { + Ok(protobuf::FileRange { + start: value.start, + end: value.end, }) } } diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 200349b5efb0..ede286191abe 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -32,7 +32,17 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig}; pub type PartitionedFileStream = Pin> + Send + Sync + 'static>>; +/// File part identified by [start, end) positions. #[derive(Debug, Clone)] +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics /// A single file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. pub struct PartitionedFile { @@ -40,7 +50,8 @@ pub struct PartitionedFile { pub file_meta: FileMeta, /// Values of partition columns to be appended to each row pub partition_values: Vec, - // We may include row group range here for a more fine-grained parallel execution + /// An optional file range for a more fine-grained parallel execution + pub range: Option, } impl PartitionedFile { @@ -52,6 +63,19 @@ impl PartitionedFile { last_modified: None, }, partition_values: vec![], + range: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + file_meta: FileMeta { + sized_file: SizedFile { path, size }, + last_modified: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), } } } @@ -67,5 +91,6 @@ pub fn local_unpartitioned_file(file: String) -> PartitionedFile { PartitionedFile { file_meta: local::local_unpartitioned_file(file), partition_values: vec![], + range: None, } } From 636631da6d4d887e31344eee18074893f6865b71 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 10 Apr 2022 22:02:09 +0800 Subject: [PATCH 07/10] Update datafusion/core/src/datasource/listing/mod.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/datasource/listing/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index ede286191abe..36e5d7c4193e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -32,7 +32,9 @@ pub use table::{ListingOptions, ListingTable, ListingTableConfig}; pub type PartitionedFileStream = Pin> + Send + Sync + 'static>>; -/// File part identified by [start, end) positions. +/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" +/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping +/// sections of a Parquet file in parallel. #[derive(Debug, Clone)] pub struct FileRange { /// Range start From ca559605b9ad63dcd0255bbc6dad2d2129b0803b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 10 Apr 2022 23:24:15 +0800 Subject: [PATCH 08/10] test --- datafusion/core/src/datasource/listing/mod.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 36e5d7c4193e..d7932b38f62e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -34,7 +34,7 @@ pub type PartitionedFileStream = /// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" /// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping -/// sections of a Parquet file in parallel. +/// sections of a Parquet file in parallel. #[derive(Debug, Clone)] pub struct FileRange { /// Range start diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index e4cae9c169a3..308e3f2ab4e2 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -660,6 +660,7 @@ mod tests { }; use super::*; + use crate::datasource::listing::FileRange; use crate::execution::options::CsvReadOptions; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use arrow::array::Float32Array; @@ -667,6 +668,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field}, }; + use datafusion_data_access::object_store::local; use futures::StreamExt; use parquet::{ arrow::ArrowWriter, @@ -1003,6 +1005,81 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_range() -> Result<()> { + fn file_range(file: String, start: i64, end: i64) -> PartitionedFile { + PartitionedFile { + file_meta: local::local_unpartitioned_file(file), + partition_values: vec![], + range: Some(FileRange { start, end }), + } + } + + async fn assert_parquet_read( + file_groups: Vec>, + expected_row_num: Option, + task_ctx: Arc, + file_schema: SchemaRef, + ) -> Result<()> { + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups, + file_schema, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + let results = parquet_exec.execute(0, task_ctx).await?.next().await; + + if let Some(expected_row_num) = expected_row_num { + let batch = results.unwrap()?; + assert_eq!(expected_row_num, batch.num_rows()); + } else { + assert!(results.is_none()); + } + + Ok(()) + } + + let session_ctx = SessionContext::new(); + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_plain.parquet", testdata); + let file_schema = ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename.clone()])) + .await?; + + let group_empty = vec![vec![file_range(filename.clone(), 0, 5)]]; + let group_contain = vec![vec![file_range(filename.clone(), 5, i64::MAX)]]; + let group_all = vec![vec![ + file_range(filename.clone(), 0, 5), + file_range(filename.clone(), 5, i64::MAX), + ]]; + + assert_parquet_read( + group_empty, + None, + session_ctx.task_ctx(), + file_schema.clone(), + ) + .await?; + assert_parquet_read( + group_contain, + Some(8), + session_ctx.task_ctx(), + file_schema.clone(), + ) + .await?; + assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), file_schema) + .await?; + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_partition() -> Result<()> { let session_ctx = SessionContext::new(); From b574a94907031312cb4e4c5f0ddbc02253312ff6 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 12 Apr 2022 16:17:11 +0800 Subject: [PATCH 09/10] Update parquet.rs --- datafusion/core/src/physical_plan/file_format/parquet.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 50878ffd52f4..b0f023e87a17 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -538,6 +538,11 @@ fn read_partition( )); } if let Some(range) = &partitioned_file.range { + assert!( + range.start >= 0 && range.end > 0 && range.end > range.start, + "invalid range specified: {:?}", + range + ); opt = opt.with_range(range.start, range.end); } From f20ef44f9f12eaac5e00429296218b7b42bbb613 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 14 Apr 2022 00:19:40 +0800 Subject: [PATCH 10/10] fix --- datafusion/core/src/physical_plan/file_format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 04428e414ed8..cfc99a71a29a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -337,7 +337,7 @@ impl ParquetExecStream { file_metrics, )); } - if let Some(range) = &partitioned_file.range { + if let Some(range) = &file.range { assert!( range.start >= 0 && range.end > 0 && range.end > range.start, "invalid range specified: {:?}",