Skip to content

Commit 47b03d9

Browse files
committed
include predicate in explain plans
1 parent 8c1b98c commit 47b03d9

File tree

5 files changed

+175
-12
lines changed

5 files changed

+175
-12
lines changed
+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::RecordBatch;
19+
use arrow::datatypes::{DataType, Field, Schema};
20+
use bytes::{BufMut, BytesMut};
21+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
22+
use datafusion::config::ConfigOptions;
23+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
24+
use datafusion_execution::object_store::ObjectStoreUrl;
25+
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use object_store::memory::InMemory;
29+
use object_store::path::Path;
30+
use object_store::ObjectStore;
31+
use parquet::arrow::ArrowWriter;
32+
use std::sync::Arc;
33+
34+
async fn create_plan() -> Arc<dyn ExecutionPlan> {
35+
let ctx = SessionContext::new();
36+
let schema = Arc::new(Schema::new(vec![
37+
Field::new("id", DataType::Int32, true),
38+
Field::new("name", DataType::Utf8, true),
39+
Field::new("age", DataType::UInt16, true),
40+
Field::new("salary", DataType::Float64, true),
41+
]));
42+
let batch = RecordBatch::new_empty(schema);
43+
44+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
45+
let data = {
46+
let out = BytesMut::new();
47+
let mut writer =
48+
ArrowWriter::try_new(out.writer(), batch.schema(), None).unwrap();
49+
writer.write(&batch).unwrap();
50+
writer.finish().unwrap();
51+
writer.into_inner().unwrap().into_inner().freeze()
52+
};
53+
store
54+
.put(&Path::from("test.parquet"), data.into())
55+
.await
56+
.unwrap();
57+
ctx.register_object_store(
58+
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),
59+
store,
60+
);
61+
62+
ctx.register_parquet("t", "memory://", ParquetReadOptions::default())
63+
.await
64+
.unwrap();
65+
66+
let df = ctx
67+
.sql(
68+
r"
69+
WITH brackets AS (
70+
SELECT age % 10 AS age_bracket
71+
FROM t
72+
GROUP BY age % 10
73+
HAVING COUNT(*) > 10
74+
)
75+
SELECT id, name, age, salary
76+
FROM t
77+
JOIN brackets ON t.age % 10 = brackets.age_bracket
78+
WHERE age > 20 AND data.salary > 1000
79+
ORDER BY data.salary DESC
80+
LIMIT 100
81+
",
82+
)
83+
.await
84+
.unwrap();
85+
86+
df.create_physical_plan().await.unwrap()
87+
}
88+
89+
#[derive(Clone)]
90+
struct BenchmarkPlan {
91+
plan: Arc<dyn ExecutionPlan>,
92+
config: ConfigOptions,
93+
}
94+
95+
impl std::fmt::Display for BenchmarkPlan {
96+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97+
write!(f, "BenchmarkPlan")
98+
}
99+
}
100+
101+
fn bench_push_down_filter(c: &mut Criterion) {
102+
// Create a relatively complex plan
103+
let plan = tokio::runtime::Runtime::new()
104+
.unwrap()
105+
.block_on(create_plan());
106+
let mut config = ConfigOptions::default();
107+
config.execution.parquet.pushdown_filters = true;
108+
let plan = BenchmarkPlan { plan, config };
109+
110+
c.bench_with_input(
111+
BenchmarkId::new("push_down_filter", plan.clone()),
112+
&plan,
113+
|b, plan| {
114+
b.iter(|| {
115+
let optimizer = PushdownFilter::new();
116+
optimizer
117+
.optimize(Arc::clone(&plan.plan), &plan.config)
118+
.unwrap();
119+
});
120+
},
121+
);
122+
}
123+
124+
criterion_group!(benches, bench_push_down_filter);
125+
criterion_main!(benches);

datafusion/datasource-parquet/src/opener.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl FileOpener for ParquetOpener {
178178

179179
// Build predicates for this specific file
180180
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
181-
&predicate,
181+
predicate.as_ref(),
182182
&physical_file_schema,
183183
&predicate_creation_errors,
184184
);
@@ -390,8 +390,8 @@ pub(crate) fn build_page_pruning_predicate(
390390
))
391391
}
392392

393-
fn build_pruning_predicates(
394-
predicate: &Option<Arc<dyn PhysicalExpr>>,
393+
pub(crate) fn build_pruning_predicates(
394+
predicate: Option<&Arc<dyn PhysicalExpr>>,
395395
file_schema: &SchemaRef,
396396
predicate_creation_errors: &Count,
397397
) -> (

datafusion/datasource-parquet/src/source.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::fmt::Debug;
2121
use std::fmt::Formatter;
2222
use std::sync::Arc;
2323

24+
use crate::opener::build_pruning_predicates;
2425
use crate::opener::ParquetOpener;
2526
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
2627
use crate::DefaultParquetFileReaderFactory;
@@ -42,9 +43,11 @@ use datafusion_physical_plan::filter_pushdown::filter_pushdown_not_supported;
4243
use datafusion_physical_plan::filter_pushdown::FilterDescription;
4344
use datafusion_physical_plan::filter_pushdown::FilterPushdownResult;
4445
use datafusion_physical_plan::filter_pushdown::FilterPushdownSupport;
46+
use datafusion_physical_plan::metrics::Count;
4547
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
4648
use datafusion_physical_plan::DisplayFormatType;
4749

50+
use itertools::Itertools;
4851
use object_store::ObjectStore;
4952

5053
/// Execution plan for reading one or more Parquet files.
@@ -549,7 +552,42 @@ impl FileSource for ParquetSource {
549552
.map(|p| format!(", predicate={p}"))
550553
.unwrap_or_default();
551554

552-
write!(f, "{}", predicate_string)
555+
write!(f, "{}", predicate_string)?;
556+
557+
// Try to build a the pruning predicates.
558+
// These are only generated here because it's useful to have *some*
559+
// idea of what pushdown is happening when viewing plans.
560+
// However it is important to note that these predicates are *not*
561+
// necessarily the predicates that are actually evaluated:
562+
// the actual predicates are built in reference to the physical schema of
563+
// each file, which we do not have at this point and hence cannot use.
564+
// Instead we use the logical schema of the file (the table schema without partition columns).
565+
match (&self.file_schema, &self.predicate) {
566+
(Some(file_schema), Some(predicate)) => {
567+
let mut predicate_creation_errors = Count::new();
568+
if let (Some(pruning_predicate), _) = build_pruning_predicates(
569+
Some(predicate),
570+
file_schema,
571+
&mut predicate_creation_errors,
572+
) {
573+
let mut guarantees = pruning_predicate
574+
.literal_guarantees()
575+
.iter()
576+
.map(|item| format!("{}", item))
577+
.collect_vec();
578+
guarantees.sort();
579+
writeln!(
580+
f,
581+
", pruning_predicate={}, required_guarantees=[{}]",
582+
pruning_predicate.predicate_expr(),
583+
guarantees.join(", ")
584+
)?;
585+
}
586+
}
587+
_ => {}
588+
}
589+
590+
Ok(())
553591
}
554592
DisplayFormatType::TreeRender => {
555593
if let Some(predicate) = self.predicate() {

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ physical_plan
8989
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
9090
03)----CoalesceBatchesExec: target_batch_size=8192
9191
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
92-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2
92+
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
9393

9494

9595
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
@@ -117,7 +117,7 @@ physical_plan
117117
03)----CoalesceBatchesExec: target_batch_size=8192
118118
04)------FilterExec: b@1 > 2, projection=[a@0]
119119
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
120-
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2
120+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
121121

122122
# also test querying on columns that are not in all the files
123123
query T
@@ -139,7 +139,7 @@ physical_plan
139139
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
140140
03)----CoalesceBatchesExec: target_batch_size=8192
141141
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
142-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL AND b@1 > 2 AND a@0 IS NOT NULL
142+
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL AND b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[]
143143

144144

145145
query I
@@ -160,7 +160,7 @@ physical_plan
160160
02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
161161
03)----CoalesceBatchesExec: target_batch_size=8192
162162
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
163-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar AND a@0 = bar
163+
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar AND a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1 AND a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)]
164164

165165
## cleanup
166166
statement ok

datafusion/sqllogictest/test_files/repartition_scan.slt

+4-4
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ logical_plan
6161
physical_plan
6262
01)CoalesceBatchesExec: target_batch_size=8192
6363
02)--FilterExec: column1@0 != 42
64-
03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42
64+
03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
6565

6666
# disable round robin repartitioning
6767
statement ok
@@ -77,7 +77,7 @@ logical_plan
7777
physical_plan
7878
01)CoalesceBatchesExec: target_batch_size=8192
7979
02)--FilterExec: column1@0 != 42
80-
03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42
80+
03)----DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
8181

8282
# enable round robin repartitioning again
8383
statement ok
@@ -102,7 +102,7 @@ physical_plan
102102
02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true]
103103
03)----CoalesceBatchesExec: target_batch_size=8192
104104
04)------FilterExec: column1@0 != 42
105-
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42
105+
05)--------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
106106

107107

108108
## Read the files as though they are ordered
@@ -138,7 +138,7 @@ physical_plan
138138
01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
139139
02)--CoalesceBatchesExec: target_batch_size=8192
140140
03)----FilterExec: column1@0 != 42
141-
04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42
141+
04)------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], file_type=parquet, predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)]
142142

143143
# Cleanup
144144
statement ok

0 commit comments

Comments
 (0)