Skip to content

Commit 5e59d74

Browse files
committed
review suggestions
1 parent 59c5509 commit 5e59d74

File tree

13 files changed

+271
-221
lines changed

13 files changed

+271
-221
lines changed

datafusion/core/tests/physical_optimizer/push_down_filter.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ use datafusion_physical_expr::{
4444
aggregate::AggregateExprBuilder, conjunction, Partitioning,
4545
};
4646
use datafusion_physical_expr_common::physical_expr::fmt_sql;
47-
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
47+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::filter_pushdown::{
50-
FilterPushdownPropagation, FilterPushdowns,
50+
FilterPushdownPropagation, PredicateSupports,
5151
};
5252
use datafusion_physical_plan::{
5353
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -153,7 +153,7 @@ impl FileSource for TestSource {
153153

154154
fn try_pushdown_filters(
155155
&self,
156-
filters: &[Arc<dyn PhysicalExpr>],
156+
filters: Vec<Arc<dyn PhysicalExpr>>,
157157
config: &ConfigOptions,
158158
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
159159
let mut all_filters = filters.iter().map(Arc::clone).collect::<Vec<_>>();
@@ -167,8 +167,8 @@ impl FileSource for TestSource {
167167
statistics: self.statistics.clone(), // should be updated in reality
168168
});
169169
Ok(FilterPushdownPropagation {
170-
parent_filter_result: FilterPushdowns::all_supported(&all_filters),
171-
new_node: Some(new_node),
170+
filters: PredicateSupports::all_supported(all_filters),
171+
updated_node: Some(new_node),
172172
})
173173
} else {
174174
Ok(FilterPushdownPropagation::unsupported(filters))
@@ -196,7 +196,7 @@ fn test_pushdown_into_scan() {
196196

197197
// expect the predicate to be pushed down into the DataSource
198198
insta::assert_snapshot!(
199-
OptimizationTest::new(plan, PushdownFilter{}, true),
199+
OptimizationTest::new(plan, FilterPushdown{}, true),
200200
@r"
201201
OptimizationTest:
202202
input:
@@ -220,7 +220,7 @@ fn test_pushdown_into_scan_with_config_options() {
220220
insta::assert_snapshot!(
221221
OptimizationTest::new(
222222
Arc::clone(&plan),
223-
PushdownFilter {},
223+
FilterPushdown {},
224224
false
225225
),
226226
@r"
@@ -239,7 +239,7 @@ fn test_pushdown_into_scan_with_config_options() {
239239
insta::assert_snapshot!(
240240
OptimizationTest::new(
241241
plan,
242-
PushdownFilter {},
242+
FilterPushdown {},
243243
true
244244
),
245245
@r"
@@ -264,7 +264,7 @@ fn test_filter_collapse() {
264264
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
265265

266266
insta::assert_snapshot!(
267-
OptimizationTest::new(plan, PushdownFilter{}, true),
267+
OptimizationTest::new(plan, FilterPushdown{}, true),
268268
@r"
269269
OptimizationTest:
270270
input:
@@ -294,7 +294,7 @@ fn test_filter_with_projection() {
294294

295295
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
296296
insta::assert_snapshot!(
297-
OptimizationTest::new(plan, PushdownFilter{}, true),
297+
OptimizationTest::new(plan, FilterPushdown{}, true),
298298
@r"
299299
OptimizationTest:
300300
input:
@@ -318,7 +318,7 @@ fn test_filter_with_projection() {
318318
.unwrap(),
319319
);
320320
insta::assert_snapshot!(
321-
OptimizationTest::new(plan, PushdownFilter{},true),
321+
OptimizationTest::new(plan, FilterPushdown{},true),
322322
@r"
323323
OptimizationTest:
324324
input:
@@ -347,7 +347,7 @@ fn test_push_down_through_transparent_nodes() {
347347

348348
// expect the predicate to be pushed down into the DataSource
349349
insta::assert_snapshot!(
350-
OptimizationTest::new(plan, PushdownFilter{},true),
350+
OptimizationTest::new(plan, FilterPushdown{},true),
351351
@r"
352352
OptimizationTest:
353353
input:
@@ -411,7 +411,7 @@ fn test_no_pushdown_through_aggregates() {
411411

412412
// expect the predicate to be pushed down into the DataSource
413413
insta::assert_snapshot!(
414-
OptimizationTest::new(plan, PushdownFilter{}, true),
414+
OptimizationTest::new(plan, FilterPushdown{}, true),
415415
@r"
416416
OptimizationTest:
417417
input:

datafusion/datasource/src/file.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub trait FileSource: Send + Sync {
102102
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
103103
fn try_pushdown_filters(
104104
&self,
105-
filters: &[Arc<dyn PhysicalExpr>],
105+
filters: Vec<Arc<dyn PhysicalExpr>>,
106106
_config: &ConfigOptions,
107107
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
108108
Ok(FilterPushdownPropagation::unsupported(filters))

datafusion/datasource/src/file_scan_config.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -594,24 +594,24 @@ impl DataSource for FileScanConfig {
594594

595595
fn try_pushdown_filters(
596596
&self,
597-
filters: &[Arc<dyn PhysicalExpr>],
597+
filters: Vec<Arc<dyn PhysicalExpr>>,
598598
config: &ConfigOptions,
599599
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
600600
let result = self.file_source.try_pushdown_filters(filters, config)?;
601-
match result.new_node {
601+
match result.updated_node {
602602
Some(new_node) => {
603603
let mut new_data_source = self.clone();
604604
new_data_source.file_source = new_node;
605605
Ok(FilterPushdownPropagation {
606-
parent_filter_result: result.parent_filter_result,
607-
new_node: Some(Arc::new(new_data_source) as _),
606+
filters: result.filters,
607+
updated_node: Some(Arc::new(new_data_source) as _),
608608
})
609609
}
610610
None => {
611611
// If the file source does not support filter pushdown, return the original config
612612
Ok(FilterPushdownPropagation {
613-
parent_filter_result: result.parent_filter_result,
614-
new_node: None,
613+
filters: result.filters,
614+
updated_node: None,
615615
})
616616
}
617617
}

datafusion/datasource/src/source.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub trait DataSource: Send + Sync + Debug {
8888
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
8989
fn try_pushdown_filters(
9090
&self,
91-
filters: &[Arc<dyn PhysicalExpr>],
91+
filters: Vec<Arc<dyn PhysicalExpr>>,
9292
_config: &ConfigOptions,
9393
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
9494
Ok(FilterPushdownPropagation::unsupported(filters))
@@ -214,23 +214,23 @@ impl ExecutionPlan for DataSourceExec {
214214
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
215215
// Push any remaining filters into our data source
216216
let res = self.data_source.try_pushdown_filters(
217-
&child_pushdown_result.parent_filters.into_inner_filters(),
217+
child_pushdown_result.parent_filters.collect_all(),
218218
config,
219219
)?;
220-
match res.new_node {
220+
match res.updated_node {
221221
Some(data_source) => {
222222
let mut new_node = self.clone();
223223
new_node.data_source = data_source;
224224
new_node.cache =
225225
Self::compute_properties(Arc::clone(&new_node.data_source));
226226
Ok(FilterPushdownPropagation {
227-
parent_filter_result: res.parent_filter_result,
228-
new_node: Some(Arc::new(new_node)),
227+
filters: res.filters,
228+
updated_node: Some(Arc::new(new_node)),
229229
})
230230
}
231231
None => Ok(FilterPushdownPropagation {
232-
parent_filter_result: res.parent_filter_result,
233-
new_node: None,
232+
filters: res.filters,
233+
updated_node: None,
234234
}),
235235
}
236236
}

0 commit comments

Comments
 (0)