diff --git a/Cargo.lock b/Cargo.lock index a623a57a1acc..0131dbe852c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2146,6 +2146,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-optimizer", "datafusion-physical-plan", + "datafusion-pruning", "datafusion-session", "futures", "itertools 0.14.0", @@ -2558,6 +2559,7 @@ dependencies = [ "arrow", "arrow-schema", "datafusion-common", + "datafusion-datasource", "datafusion-expr", "datafusion-expr-common", "datafusion-functions-nested", @@ -2565,6 +2567,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "insta", + "itertools 0.14.0", "log", ] diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 87a5ed33f127..098dc366ea66 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -152,6 +152,10 @@ impl TestOutput { self.metric_value("row_groups_pruned_statistics") } + fn files_ranges_pruned_statistics(&self) -> Option { + self.metric_value("files_ranges_pruned_statistics") + } + /// The number of row_groups matched by bloom filter or statistics fn row_groups_matched(&self) -> Option { self.row_groups_matched_bloom_filter() @@ -192,6 +196,8 @@ impl ContextWithParquet { unit: Unit, mut config: SessionConfig, ) -> Self { + // Use a single partition for deterministic results no matter how many CPUs the host has + config = config.with_target_partitions(1); let file = match unit { Unit::RowGroup(row_per_group) => { config = config.with_parquet_bloom_filter_pruning(true); diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index a88c0773e040..8613cd481be1 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -31,6 +31,7 @@ struct RowGroupPruningTest { expected_errors: Option, expected_row_group_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, + expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, expected_results: usize, @@ -44,6 +45,7 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, + expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, expected_results: 0, @@ -80,6 +82,11 @@ impl RowGroupPruningTest { self } + fn with_pruned_files(mut self, pruned_files: Option) -> Self { + self.expected_files_pruned_by_statistics = pruned_files; + self + } + // Set the expected matched row groups by bloom filter fn with_matched_by_bloom_filter(mut self, matched_by_bf: Option) -> Self { self.expected_row_group_matched_by_bloom_filter = matched_by_bf; @@ -121,6 +128,11 @@ impl RowGroupPruningTest { self.expected_row_group_pruned_by_statistics, "mismatched row_groups_pruned_statistics", ); + assert_eq!( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics, + "mismatched files_ranges_pruned_statistics", + ); assert_eq!( output.row_groups_matched_bloom_filter(), self.expected_row_group_matched_by_bloom_filter, @@ -148,6 +160,7 @@ async fn prune_timestamps_nanos() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(10) @@ -165,6 +178,7 @@ async fn prune_timestamps_micros() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(10) @@ -182,6 +196,7 @@ async fn prune_timestamps_millis() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(10) @@ -199,6 +214,7 @@ async fn prune_timestamps_seconds() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(10) @@ -214,6 +230,7 @@ async fn prune_date32() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -256,6 +273,7 @@ async fn prune_disabled() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(10) @@ -301,6 +319,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -315,6 +334,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -330,6 +350,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -344,6 +365,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -359,6 +381,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(3) @@ -374,6 +397,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -389,6 +413,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(9) @@ -405,6 +430,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -422,6 +448,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(1)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(0) @@ -438,6 +465,7 @@ macro_rules! int_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(4)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(4)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(19) @@ -467,6 +495,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -482,6 +511,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -496,6 +526,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -511,6 +542,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -526,6 +558,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -542,6 +575,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -559,6 +593,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(4)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(0) @@ -575,6 +610,7 @@ macro_rules! uint_tests { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(4)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(4)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(19) @@ -604,6 +640,7 @@ async fn prune_int32_eq_large_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(0) @@ -626,6 +663,7 @@ async fn prune_uint32_eq_large_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(0) @@ -641,6 +679,7 @@ async fn prune_f64_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -652,6 +691,7 @@ async fn prune_f64_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(11) @@ -669,6 +709,7 @@ async fn prune_f64_scalar_fun_and_gt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -685,6 +726,7 @@ async fn prune_f64_scalar_fun() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(1) @@ -701,6 +743,7 @@ async fn prune_f64_complex_expr() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(9) @@ -717,6 +760,7 @@ async fn prune_f64_complex_expr_subtract() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(9) @@ -735,6 +779,7 @@ async fn prune_decimal_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(6) @@ -746,6 +791,7 @@ async fn prune_decimal_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(8) @@ -757,6 +803,7 @@ async fn prune_decimal_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(6) @@ -768,6 +815,7 @@ async fn prune_decimal_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(8) @@ -786,6 +834,7 @@ async fn prune_decimal_eq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(2)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -797,6 +846,7 @@ async fn prune_decimal_eq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(2)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -809,6 +859,7 @@ async fn prune_decimal_eq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(2)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -820,6 +871,7 @@ async fn prune_decimal_eq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(2)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -839,6 +891,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(5) @@ -850,6 +903,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(6) @@ -861,6 +915,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(5) @@ -872,6 +927,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(6) @@ -885,6 +941,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(2)) .with_expected_rows(1) @@ -898,6 +955,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(2)) .with_expected_rows(1) @@ -911,6 +969,7 @@ async fn prune_decimal_in_list() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(2)) .with_expected_rows(1) @@ -929,6 +988,7 @@ async fn prune_string_eq_match() { // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(1) @@ -947,6 +1007,7 @@ async fn prune_string_eq_no_match() { // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(0) @@ -963,6 +1024,7 @@ async fn prune_string_eq_no_match() { // false positive on 'mixed' batch: 'backend one' < 'frontend nine' < 'frontend six' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(2)) .with_expected_rows(0) @@ -980,6 +1042,7 @@ async fn prune_string_neq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(3)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(14) @@ -998,6 +1061,7 @@ async fn prune_string_lt() { // matches 'all backends' only .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(3) @@ -1012,6 +1076,7 @@ async fn prune_string_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) // all backends from 'mixed' and 'all backends' @@ -1031,6 +1096,7 @@ async fn prune_binary_eq_match() { // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(1) @@ -1049,6 +1115,7 @@ async fn prune_binary_eq_no_match() { // false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three' .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(0) @@ -1065,6 +1132,7 @@ async fn prune_binary_eq_no_match() { // false positive on 'mixed' batch: 'backend one' < 'frontend nine' < 'frontend six' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(2)) .with_expected_rows(0) @@ -1082,6 +1150,7 @@ async fn prune_binary_neq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(3)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(14) @@ -1100,6 +1169,7 @@ async fn prune_binary_lt() { // matches 'all backends' only .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(3) @@ -1114,6 +1184,7 @@ async fn prune_binary_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) // all backends from 'mixed' and 'all backends' @@ -1133,6 +1204,7 @@ async fn prune_fixedsizebinary_eq_match() { // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(1) @@ -1148,6 +1220,7 @@ async fn prune_fixedsizebinary_eq_match() { // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7' .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(1) @@ -1166,6 +1239,7 @@ async fn prune_fixedsizebinary_eq_no_match() { // false positive on 'mixed' batch: 'be1' < 'be9' < 'fe4' .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(1)) .with_expected_rows(0) @@ -1183,6 +1257,7 @@ async fn prune_fixedsizebinary_neq() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(3)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(3)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(14) @@ -1201,6 +1276,7 @@ async fn prune_fixedsizebinary_lt() { // matches 'all backends' only .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -1215,6 +1291,7 @@ async fn prune_fixedsizebinary_lt() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) // all backends from 'mixed' and 'all backends' @@ -1235,6 +1312,7 @@ async fn prune_periods_in_column_names() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(2)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(7) @@ -1246,6 +1324,7 @@ async fn prune_periods_in_column_names() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(5) @@ -1257,6 +1336,7 @@ async fn prune_periods_in_column_names() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_matched_by_bloom_filter(Some(1)) .with_pruned_by_bloom_filter(Some(0)) .with_expected_rows(2) @@ -1277,6 +1357,7 @@ async fn test_row_group_with_null_values() { .with_query("SELECT * FROM t WHERE \"i8\" <= 5") .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_pruned_by_stats(Some(2)) .with_expected_rows(5) .with_matched_by_bloom_filter(Some(0)) @@ -1290,6 +1371,7 @@ async fn test_row_group_with_null_values() { .with_query("SELECT * FROM t WHERE \"i8\" is Null") .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) + .with_pruned_files(Some(0)) .with_pruned_by_stats(Some(1)) .with_expected_rows(10) .with_matched_by_bloom_filter(Some(0)) @@ -1303,6 +1385,7 @@ async fn test_row_group_with_null_values() { .with_query("SELECT * FROM t WHERE \"i16\" is Not Null") .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_pruned_by_stats(Some(2)) .with_expected_rows(5) .with_matched_by_bloom_filter(Some(0)) @@ -1317,6 +1400,7 @@ async fn test_row_group_with_null_values() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(0)) .with_pruned_by_stats(Some(0)) + .with_pruned_files(Some(1)) .with_expected_rows(0) .with_matched_by_bloom_filter(Some(0)) .with_pruned_by_bloom_filter(Some(0)) @@ -1332,6 +1416,7 @@ async fn test_bloom_filter_utf8_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1344,6 +1429,7 @@ async fn test_bloom_filter_utf8_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1356,6 +1442,7 @@ async fn test_bloom_filter_utf8_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1368,6 +1455,7 @@ async fn test_bloom_filter_utf8_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1383,6 +1471,7 @@ async fn test_bloom_filter_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1395,6 +1484,7 @@ async fn test_bloom_filter_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1407,6 +1497,7 @@ async fn test_bloom_filter_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1419,6 +1510,7 @@ async fn test_bloom_filter_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1434,6 +1526,7 @@ async fn test_bloom_filter_unsigned_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1446,6 +1539,7 @@ async fn test_bloom_filter_unsigned_integer_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1461,6 +1555,7 @@ async fn test_bloom_filter_binary_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1473,6 +1568,7 @@ async fn test_bloom_filter_binary_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1485,6 +1581,7 @@ async fn test_bloom_filter_binary_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1499,6 +1596,7 @@ async fn test_bloom_filter_binary_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) @@ -1514,6 +1612,7 @@ async fn test_bloom_filter_decimal_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(1) .with_pruned_by_bloom_filter(Some(0)) .with_matched_by_bloom_filter(Some(1)) @@ -1526,6 +1625,7 @@ async fn test_bloom_filter_decimal_dict() { .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(1)) + .with_pruned_files(Some(0)) .with_expected_rows(0) .with_pruned_by_bloom_filter(Some(1)) .with_matched_by_bloom_filter(Some(0)) diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index b6a548c998dc..08d258852a20 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -45,6 +45,7 @@ datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } +datafusion-pruning = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 9acd39b37e83..574fe2a040ea 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -27,10 +27,21 @@ use datafusion_physical_plan::metrics::{ /// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory #[derive(Debug, Clone)] pub struct ParquetFileMetrics { - /// Number of files pruned by partition of file level statistics - /// This often happens at planning time but may happen at execution time + /// Number of file **ranges** pruned by partition or file level statistics. + /// Pruning of files often happens at planning time but may happen at execution time /// if dynamic filters (e.g. from a join) result in additional pruning. - pub files_pruned_statistics: Count, + /// + /// This does **not** necessarily equal the number of files pruned: + /// files may be scanned in sub-ranges to increase parallelism, + /// in which case this will represent the number of sub-ranges pruned, not the number of files. + /// The number of files pruned will always be less than or equal to this number. + /// + /// A single file may have some ranges that are not pruned and some that are pruned. + /// For example, with a query like `ORDER BY col LIMIT 10`, the TopK dynamic filter + /// pushdown optimization may fill up the TopK heap when reading the first part of a file, + /// then skip the second part if file statistics indicate it cannot contain rows + /// that would be in the TopK. + pub files_ranges_pruned_statistics: Count, /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, /// Number of row groups whose bloom filters were checked and matched (not pruned) @@ -126,11 +137,11 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .subset_time("metadata_load_time", partition); - let files_pruned_statistics = - MetricBuilder::new(metrics).counter("files_pruned_statistics", partition); + let files_ranges_pruned_statistics = MetricBuilder::new(metrics) + .counter("files_ranges_pruned_statistics", partition); Self { - files_pruned_statistics, + files_ranges_pruned_statistics, predicate_evaluation_errors, row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 69ea7a4b7896..e7e8d5207039 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,21 +29,18 @@ use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit}; +use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; use arrow::error::ArrowError; -use datafusion_common::pruning::{ - CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics, - PruningStatistics, -}; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_datasource::PartitionedFile; use datafusion_physical_expr::PhysicalExprSchemaRewriter; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_expr_common::physical_expr::{ + is_dynamic_physical_expr, PhysicalExpr, +}; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; use futures::{StreamExt, TryStreamExt}; -use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; @@ -134,66 +131,40 @@ impl FileOpener for ParquetOpener { let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { - // Prune this file using the file level statistics. + // Prune this file using the file level statistics and partition values. // Since dynamic filters may have been updated since planning it is possible that we are able // to prune files now that we couldn't prune at planning time. - if let Some(predicate) = &predicate { - // Build a pruning schema that combines the file fields and partition fields. - // Partition fileds are always at the end. - let pruning_schema = Arc::new( - Schema::new( - logical_file_schema - .fields() - .iter() - .cloned() - .chain(partition_fields.iter().cloned()) - .collect_vec(), + // It is assumed that there is no point in doing pruning here if the predicate is not dynamic, + // as it would have been done at planning time. + // We'll also check this after every record batch we read, + // and if at some point we are able to prove we can prune the file using just the file level statistics + // we can end the stream early. + let mut file_pruner = predicate + .as_ref() + .map(|p| { + Ok::<_, DataFusionError>( + (is_dynamic_physical_expr(p) | file.has_statistics()).then_some( + FilePruner::new( + Arc::clone(p), + &logical_file_schema, + partition_fields.clone(), + file.clone(), + predicate_creation_errors.clone(), + )?, + ), ) - .with_metadata(logical_file_schema.metadata().clone()), - ); - let pruning_predicate = build_pruning_predicate( - Arc::clone(predicate), - &pruning_schema, - &predicate_creation_errors, - ); - if let Some(pruning_predicate) = pruning_predicate { - // The partition column schema is the schema of the table - the schema of the file - let mut pruning = Box::new(PartitionPruningStatistics::try_new( - vec![file.partition_values.clone()], - partition_fields.clone(), - )?) - as Box; - if let Some(stats) = file.statistics { - let stats_pruning = Box::new(PrunableStatistics::new( - vec![stats], - Arc::clone(&pruning_schema), - )); - pruning = Box::new(CompositePruningStatistics::new(vec![ - pruning, - stats_pruning, - ])); - } - match pruning_predicate.prune(pruning.as_ref()) { - Ok(values) => { - assert!(values.len() == 1); - // We expect a single container -> if all containers are false skip this file - if values.into_iter().all(|v| !v) { - // Return an empty stream - file_metrics.files_pruned_statistics.add(1); - return Ok(futures::stream::empty().boxed()); - } - } - // Stats filter array could not be built, so we can't prune - Err(e) => { - debug!( - "Ignoring error building pruning predicate for file '{}': {e}", - file_meta.location(), - ); - predicate_creation_errors.add(1); - } - } + }) + .transpose()? + .flatten(); + + if let Some(file_pruner) = &mut file_pruner { + if file_pruner.should_prune()? { + // Return an empty stream immediately to skip the work of setting up the actual stream + file_metrics.files_ranges_pruned_statistics.add(1); + return Ok(futures::stream::empty().boxed()); } } + // Don't load the page index yet. Since it is not stored inline in // the footer, loading the page index if it is not needed will do // unecessary I/O. We decide later if it is needed to evaluate the @@ -439,30 +410,6 @@ fn create_initial_plan( Ok(ParquetAccessPlan::new_all(row_group_count)) } -/// Build a pruning predicate from an optional predicate expression. -/// If the predicate is None or the predicate cannot be converted to a pruning -/// predicate, return None. -/// If there is an error creating the pruning predicate it is recorded by incrementing -/// the `predicate_creation_errors` counter. -pub(crate) fn build_pruning_predicate( - predicate: Arc, - file_schema: &SchemaRef, - predicate_creation_errors: &Count, -) -> Option> { - match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { - Ok(pruning_predicate) => { - if !pruning_predicate.always_true() { - return Some(Arc::new(pruning_predicate)); - } - } - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - } - } - None -} - /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. @@ -554,7 +501,9 @@ mod test { schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, }; use datafusion_expr::{col, lit}; - use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_expr::{ + expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, + }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; @@ -601,6 +550,13 @@ mod test { data_len } + fn make_dynamic_expr(expr: Arc) -> Arc { + Arc::new(DynamicFilterPhysicalExpr::new( + expr.children().into_iter().map(Arc::clone).collect(), + expr, + )) + } + #[tokio::test] async fn test_prune_on_statistics() { let store = Arc::new(InMemory::new()) as Arc; @@ -691,7 +647,7 @@ mod test { } #[tokio::test] - async fn test_prune_on_partition_statistics() { + async fn test_prune_on_partition_statistics_with_dynamic_expression() { let store = Arc::new(InMemory::new()) as Arc; let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); @@ -753,7 +709,9 @@ mod test { // Filter should match the partition value let expr = col("part").eq(lit(1)); - let predicate = logical2physical(&expr, &table_schema); + // Mark the expression as dynamic even if it's not to force partition pruning to happen + // Otherwise we assume it already happened at the planning stage and won't re-do the work here + let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); let stream = opener .open(make_meta(), file.clone()) @@ -766,7 +724,9 @@ mod test { // Filter should not match the partition value let expr = col("part").eq(lit(2)); - let predicate = logical2physical(&expr, &table_schema); + // Mark the expression as dynamic even if it's not to force partition pruning to happen + // Otherwise we assume it already happened at the planning stage and won't re-do the work here + let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; @@ -1005,4 +965,92 @@ mod test { assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } + + /// Test that if the filter is not a dynamic filter and we have no stats we don't do extra pruning work at the file level. + #[tokio::test] + async fn test_opener_pruning_skipped_on_static_filters() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await; + + let file_schema = batch.schema(); + let mut file = PartitionedFile::new( + "part=1/file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(1))]; + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("part", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))], + pushdown_filters: false, // note that this is false! + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: true, + coerce_int96: None, + } + }; + + let make_meta = || FileMeta { + object_meta: ObjectMeta { + location: Path::from("part=1/file.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + // Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic + let expr = col("part").eq(lit(2)); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); + + // If we make the filter dynamic, it should prune + let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + } } diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index 84f5c4c2d6d5..5f3e05747d40 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -31,7 +31,7 @@ use arrow::{ use datafusion_common::pruning::PruningStatistics; use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_pruning::PruningPredicate; use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index f9fb9214429d..51d50d780f10 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -24,7 +24,7 @@ use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; -use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; use parquet::basic::Type; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index c79efd11fcc5..92e25a97c3a4 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -198,6 +198,23 @@ impl PartitionedFile { self.statistics = Some(statistics); self } + + /// Check if this file has any statistics. + /// This returns `true` if the file has any Exact or Inexact statistics + /// and `false` if all statistics are `Precision::Absent`. + pub fn has_statistics(&self) -> bool { + if let Some(stats) = &self.statistics { + stats.column_statistics.iter().any(|col_stats| { + col_stats.null_count != Precision::Absent + || col_stats.max_value != Precision::Absent + || col_stats.min_value != Precision::Absent + || col_stats.sum_value != Precision::Absent + || col_stats.distinct_count != Precision::Absent + }) + } else { + false + } + } } impl From for PartitionedFile { diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 7be132fa6123..b4cb08715f53 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -27,7 +27,9 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; @@ -345,6 +347,24 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { // This is a safe default behavior. Ok(None) } + + /// Returns the generation of this `PhysicalExpr` for snapshotting purposes. + /// The generation is an arbitrary u64 that can be used to track changes + /// in the state of the `PhysicalExpr` over time without having to do an exhaustive comparison. + /// This is useful to avoid unecessary computation or serialization if there are no changes to the expression. + /// In particular, dynamic expressions that may change over time; this allows cheap checks for changes. + /// Static expressions that do not change over time should return 0, as does the default implementation. + /// You should not call this method directly as it does not handle recursion. + /// Instead use [`snapshot_generation`] to handle recursion and capture the + /// full state of the `PhysicalExpr`. + fn snapshot_generation(&self) -> u64 { + // By default, we return 0 to indicate that this PhysicalExpr does not + // have any dynamic references or state. + // Since the recursive algorithm XORs the generations of all children the overall + // generation will be 0 if no children have a non-zero generation, meaning that + // static expressions will always return 0. + 0 + } } /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object @@ -537,3 +557,31 @@ pub fn snapshot_physical_expr( }) .data() } + +/// Check the generation of this `PhysicalExpr`. +/// Dynamic `PhysicalExpr`s may have a generation that is incremented +/// every time the state of the `PhysicalExpr` changes. +/// If the generation changes that means this `PhysicalExpr` or one of its children +/// has changed since the last time it was evaluated. +/// +/// This algorithm will not produce collisions as long as the structure of the +/// `PhysicalExpr` does not change and no `PhysicalExpr` decrements its own generation. +pub fn snapshot_generation(expr: &Arc) -> u64 { + let mut generation = 0u64; + expr.apply(|e| { + // Add the current generation of the `PhysicalExpr` to our global generation. + generation = generation.wrapping_add(e.snapshot_generation()); + Ok(TreeNodeRecursion::Continue) + }) + .expect("this traversal is infallible"); + + generation +} + +/// Check if the given `PhysicalExpr` is dynamic. +/// Internally this calls [`snapshot_generation`] to check if the generation is non-zero, +/// any dynamic `PhysicalExpr` should have a non-zero generation. +pub fn is_dynamic_physical_expr(expr: &Arc) -> bool { + // If the generation is non-zero, then this `PhysicalExpr` is dynamic. + snapshot_generation(expr) != 0 +} diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 96f55503e2b1..ba30b916b9f8 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -43,7 +43,7 @@ pub struct DynamicFilterPhysicalExpr { /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. remapped_children: Option>>, /// The source of dynamic filters. - inner: Arc>>, + inner: Arc>, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -51,6 +51,25 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } +#[derive(Debug)] +struct Inner { + /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. + /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. + generation: u64, + expr: Arc, +} + +impl Inner { + fn new(expr: Arc) -> Self { + Self { + // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0. + // This is not currently used anywhere but it seems useful to have this simple distinction. + generation: 1, + expr, + } + } +} + impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { let inner = self.current().expect("Failed to get current expression"); @@ -111,7 +130,7 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children: None, // Initially no remapped children - inner: Arc::new(RwLock::new(inner)), + inner: Arc::new(RwLock::new(Inner::new(inner))), data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } @@ -150,15 +169,17 @@ impl DynamicFilterPhysicalExpr { /// This will return the current expression with any children /// remapped to match calls to [`PhysicalExpr::with_new_children`]. pub fn current(&self) -> Result> { - let inner = self - .inner - .read() - .map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire read lock for inner".to_string(), - ) - })? - .clone(); + let inner = Arc::clone( + &self + .inner + .read() + .map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire read lock for inner".to_string(), + ) + })? + .expr, + ); let inner = Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?; Ok(inner) @@ -186,7 +207,10 @@ impl DynamicFilterPhysicalExpr { self.remapped_children.as_ref(), new_expr, )?; - *current = new_expr; + // Update the inner expression to the new expression. + current.expr = new_expr; + // Increment the generation to indicate that the expression has changed. + current.generation += 1; Ok(()) } } @@ -291,6 +315,14 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current expression as a snapshot. Ok(Some(self.current()?)) } + + fn snapshot_generation(&self) -> u64 { + // Return the current generation of the expression. + self.inner + .read() + .expect("Failed to acquire read lock for inner") + .generation + } } #[cfg(test)] diff --git a/datafusion/pruning/Cargo.toml b/datafusion/pruning/Cargo.toml index 306b74d74ee0..6acf178e4e2b 100644 --- a/datafusion/pruning/Cargo.toml +++ b/datafusion/pruning/Cargo.toml @@ -15,10 +15,12 @@ workspace = true arrow = { workspace = true } arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } +datafusion-datasource = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } +itertools = { workspace = true } log = { workspace = true } [dev-dependencies] diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs new file mode 100644 index 000000000000..bce1a64edaa3 --- /dev/null +++ b/datafusion/pruning/src/file_pruner.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File-level pruning based on partition values and file-level statistics + +use std::sync::Arc; + +use arrow::datatypes::{FieldRef, Schema, SchemaRef}; +use datafusion_common::{ + pruning::{ + CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics, + PruningStatistics, + }, + Result, +}; +use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr_common::physical_expr::{snapshot_generation, PhysicalExpr}; +use datafusion_physical_plan::metrics::Count; +use itertools::Itertools; +use log::debug; + +use crate::build_pruning_predicate; + +/// Prune based on partition values and file-level statistics. +pub struct FilePruner { + predicate_generation: Option, + predicate: Arc, + /// Schema used for pruning, which combines the file schema and partition fields. + /// Partition fields are always at the end, as they are during scans. + pruning_schema: Arc, + file: PartitionedFile, + partition_fields: Vec, + predicate_creation_errors: Count, +} + +impl FilePruner { + pub fn new( + predicate: Arc, + logical_file_schema: &SchemaRef, + partition_fields: Vec, + file: PartitionedFile, + predicate_creation_errors: Count, + ) -> Result { + // Build a pruning schema that combines the file fields and partition fields. + // Partition fileds are always at the end. + let pruning_schema = Arc::new( + Schema::new( + logical_file_schema + .fields() + .iter() + .cloned() + .chain(partition_fields.iter().cloned()) + .collect_vec(), + ) + .with_metadata(logical_file_schema.metadata().clone()), + ); + Ok(Self { + // Initialize the predicate generation to None so that the first time we call `should_prune` we actually check the predicate + // Subsequent calls will only do work if the predicate itself has changed. + // See `snapshot_generation` for more info. + predicate_generation: None, + predicate, + pruning_schema, + file, + partition_fields, + predicate_creation_errors, + }) + } + + pub fn should_prune(&mut self) -> Result { + let new_generation = snapshot_generation(&self.predicate); + if let Some(current_generation) = self.predicate_generation.as_mut() { + if *current_generation == new_generation { + return Ok(false); + } + *current_generation = new_generation; + } else { + self.predicate_generation = Some(new_generation); + } + let pruning_predicate = build_pruning_predicate( + Arc::clone(&self.predicate), + &self.pruning_schema, + &self.predicate_creation_errors, + ); + if let Some(pruning_predicate) = pruning_predicate { + // The partition column schema is the schema of the table - the schema of the file + let mut pruning = Box::new(PartitionPruningStatistics::try_new( + vec![self.file.partition_values.clone()], + self.partition_fields.clone(), + )?) as Box; + if let Some(stats) = &self.file.statistics { + let stats_pruning = Box::new(PrunableStatistics::new( + vec![Arc::clone(stats)], + Arc::clone(&self.pruning_schema), + )); + pruning = Box::new(CompositePruningStatistics::new(vec![ + pruning, + stats_pruning, + ])); + } + match pruning_predicate.prune(pruning.as_ref()) { + Ok(values) => { + assert!(values.len() == 1); + // We expect a single container -> if all containers are false skip this file + if values.into_iter().all(|v| !v) { + return Ok(true); + } + } + // Stats filter array could not be built, so we can't prune + Err(e) => { + debug!("Ignoring error building pruning predicate for file: {e}"); + self.predicate_creation_errors.add(1); + } + } + } + + Ok(false) + } +} diff --git a/datafusion/pruning/src/lib.rs b/datafusion/pruning/src/lib.rs index f49782153d26..cec4fab2262f 100644 --- a/datafusion/pruning/src/lib.rs +++ b/datafusion/pruning/src/lib.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +mod file_pruner; mod pruning_predicate; +pub use file_pruner::FilePruner; pub use pruning_predicate::{ - PredicateRewriter, PruningPredicate, PruningStatistics, RequiredColumns, - UnhandledPredicateHook, + build_pruning_predicate, PredicateRewriter, PruningPredicate, PruningStatistics, + RequiredColumns, UnhandledPredicateHook, }; diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 95dc5888d36b..1551a8f79a7a 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -30,6 +30,7 @@ use arrow::{ }; // pub use for backwards compatibility pub use datafusion_common::pruning::PruningStatistics; +use datafusion_physical_plan::metrics::Count; use log::{debug, trace}; use datafusion_common::error::{DataFusionError, Result}; @@ -376,6 +377,30 @@ pub struct PruningPredicate { literal_guarantees: Vec, } +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub fn build_pruning_predicate( + predicate: Arc, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + /// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain /// complex expressions or predicates that reference columns that are not in the /// schema.