Skip to content

Commit f732b1a

Browse files
committed
review suggestions
1 parent 59c5509 commit f732b1a

File tree

12 files changed

+298
-240
lines changed

12 files changed

+298
-240
lines changed

datafusion/core/tests/physical_optimizer/push_down_filter.rs

+15-16
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ use datafusion_physical_expr::{
4444
aggregate::AggregateExprBuilder, conjunction, Partitioning,
4545
};
4646
use datafusion_physical_expr_common::physical_expr::fmt_sql;
47-
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
47+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::filter_pushdown::{
50-
FilterPushdownPropagation, FilterPushdowns,
50+
FilterPushdownPropagation, PredicateSupports,
5151
};
5252
use datafusion_physical_plan::{
5353
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -153,22 +153,21 @@ impl FileSource for TestSource {
153153

154154
fn try_pushdown_filters(
155155
&self,
156-
filters: &[Arc<dyn PhysicalExpr>],
156+
mut filters: Vec<Arc<dyn PhysicalExpr>>,
157157
config: &ConfigOptions,
158158
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
159-
let mut all_filters = filters.iter().map(Arc::clone).collect::<Vec<_>>();
160159
if self.support && config.execution.parquet.pushdown_filters {
161160
if let Some(internal) = self.predicate.as_ref() {
162-
all_filters.push(Arc::clone(internal));
161+
filters.push(Arc::clone(internal));
163162
}
164163
let new_node = Arc::new(TestSource {
165164
support: true,
166-
predicate: Some(conjunction(all_filters.clone())),
165+
predicate: Some(conjunction(filters.clone())),
167166
statistics: self.statistics.clone(), // should be updated in reality
168167
});
169168
Ok(FilterPushdownPropagation {
170-
parent_filter_result: FilterPushdowns::all_supported(&all_filters),
171-
new_node: Some(new_node),
169+
filters: PredicateSupports::all_supported(filters),
170+
updated_node: Some(new_node),
172171
})
173172
} else {
174173
Ok(FilterPushdownPropagation::unsupported(filters))
@@ -196,7 +195,7 @@ fn test_pushdown_into_scan() {
196195

197196
// expect the predicate to be pushed down into the DataSource
198197
insta::assert_snapshot!(
199-
OptimizationTest::new(plan, PushdownFilter{}, true),
198+
OptimizationTest::new(plan, FilterPushdown{}, true),
200199
@r"
201200
OptimizationTest:
202201
input:
@@ -220,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() {
220219
insta::assert_snapshot!(
221220
OptimizationTest::new(
222221
Arc::clone(&plan),
223-
PushdownFilter {},
222+
FilterPushdown {},
224223
false
225224
),
226225
@r"
@@ -239,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() {
239238
insta::assert_snapshot!(
240239
OptimizationTest::new(
241240
plan,
242-
PushdownFilter {},
241+
FilterPushdown {},
243242
true
244243
),
245244
@r"
@@ -264,7 +263,7 @@ fn test_filter_collapse() {
264263
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
265264

266265
insta::assert_snapshot!(
267-
OptimizationTest::new(plan, PushdownFilter{}, true),
266+
OptimizationTest::new(plan, FilterPushdown{}, true),
268267
@r"
269268
OptimizationTest:
270269
input:
@@ -294,7 +293,7 @@ fn test_filter_with_projection() {
294293

295294
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
296295
insta::assert_snapshot!(
297-
OptimizationTest::new(plan, PushdownFilter{}, true),
296+
OptimizationTest::new(plan, FilterPushdown{}, true),
298297
@r"
299298
OptimizationTest:
300299
input:
@@ -318,7 +317,7 @@ fn test_filter_with_projection() {
318317
.unwrap(),
319318
);
320319
insta::assert_snapshot!(
321-
OptimizationTest::new(plan, PushdownFilter{},true),
320+
OptimizationTest::new(plan, FilterPushdown{},true),
322321
@r"
323322
OptimizationTest:
324323
input:
@@ -347,7 +346,7 @@ fn test_push_down_through_transparent_nodes() {
347346

348347
// expect the predicate to be pushed down into the DataSource
349348
insta::assert_snapshot!(
350-
OptimizationTest::new(plan, PushdownFilter{},true),
349+
OptimizationTest::new(plan, FilterPushdown{},true),
351350
@r"
352351
OptimizationTest:
353352
input:
@@ -411,7 +410,7 @@ fn test_no_pushdown_through_aggregates() {
411410

412411
// expect the predicate to be pushed down into the DataSource
413412
insta::assert_snapshot!(
414-
OptimizationTest::new(plan, PushdownFilter{}, true),
413+
OptimizationTest::new(plan, FilterPushdown{}, true),
415414
@r"
416415
OptimizationTest:
417416
input:

datafusion/datasource/src/file.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub trait FileSource: Send + Sync {
102102
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
103103
fn try_pushdown_filters(
104104
&self,
105-
filters: &[Arc<dyn PhysicalExpr>],
105+
filters: Vec<Arc<dyn PhysicalExpr>>,
106106
_config: &ConfigOptions,
107107
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
108108
Ok(FilterPushdownPropagation::unsupported(filters))

datafusion/datasource/src/file_scan_config.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -594,24 +594,28 @@ impl DataSource for FileScanConfig {
594594

595595
fn try_pushdown_filters(
596596
&self,
597-
filters: &[Arc<dyn PhysicalExpr>],
597+
filters: Vec<Arc<dyn PhysicalExpr>>,
598598
config: &ConfigOptions,
599599
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
600600
let result = self.file_source.try_pushdown_filters(filters, config)?;
601-
match result.new_node {
602-
Some(new_node) => {
603-
let mut new_data_source = self.clone();
604-
new_data_source.file_source = new_node;
601+
match result.updated_node {
602+
Some(new_file_source) => {
603+
let file_scan_config = FileScanConfigBuilder::new(
604+
self.object_store_url.clone(),
605+
Arc::clone(&self.file_schema),
606+
new_file_source,
607+
)
608+
.build();
605609
Ok(FilterPushdownPropagation {
606-
parent_filter_result: result.parent_filter_result,
607-
new_node: Some(Arc::new(new_data_source) as _),
610+
filters: result.filters,
611+
updated_node: Some(Arc::new(file_scan_config) as _),
608612
})
609613
}
610614
None => {
611615
// If the file source does not support filter pushdown, return the original config
612616
Ok(FilterPushdownPropagation {
613-
parent_filter_result: result.parent_filter_result,
614-
new_node: None,
617+
filters: result.filters,
618+
updated_node: None,
615619
})
616620
}
617621
}

datafusion/datasource/src/source.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub trait DataSource: Send + Sync + Debug {
8888
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
8989
fn try_pushdown_filters(
9090
&self,
91-
filters: &[Arc<dyn PhysicalExpr>],
91+
filters: Vec<Arc<dyn PhysicalExpr>>,
9292
_config: &ConfigOptions,
9393
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
9494
Ok(FilterPushdownPropagation::unsupported(filters))
@@ -214,23 +214,23 @@ impl ExecutionPlan for DataSourceExec {
214214
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
215215
// Push any remaining filters into our data source
216216
let res = self.data_source.try_pushdown_filters(
217-
&child_pushdown_result.parent_filters.into_inner_filters(),
217+
child_pushdown_result.parent_filters.collect_all(),
218218
config,
219219
)?;
220-
match res.new_node {
220+
match res.updated_node {
221221
Some(data_source) => {
222222
let mut new_node = self.clone();
223223
new_node.data_source = data_source;
224224
new_node.cache =
225225
Self::compute_properties(Arc::clone(&new_node.data_source));
226226
Ok(FilterPushdownPropagation {
227-
parent_filter_result: res.parent_filter_result,
228-
new_node: Some(Arc::new(new_node)),
227+
filters: res.filters,
228+
updated_node: Some(Arc::new(new_node)),
229229
})
230230
}
231231
None => Ok(FilterPushdownPropagation {
232-
parent_filter_result: res.parent_filter_result,
233-
new_node: None,
232+
filters: res.filters,
233+
updated_node: None,
234234
}),
235235
}
236236
}

0 commit comments

Comments
 (0)