Skip to content

refactor filter pushdown apis #15801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
67 changes: 32 additions & 35 deletions datafusion/core/tests/physical_optimizer/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ use datafusion_physical_expr::{
aggregate::AggregateExprBuilder, conjunction, Partitioning,
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter_pushdown::{
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
FilterPushdownSupport,
FilterPushdownPropagation, PredicateSupports,
};
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
Expand Down Expand Up @@ -154,29 +153,24 @@ impl FileSource for TestSource {

fn try_pushdown_filters(
&self,
mut fd: FilterDescription,
mut filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
if self.support && config.execution.parquet.pushdown_filters {
if let Some(internal) = self.predicate.as_ref() {
fd.filters.push(Arc::clone(internal));
filters.push(Arc::clone(internal));
}
let all_filters = fd.take_description();

Ok(FilterPushdownResult {
support: FilterPushdownSupport::Supported {
child_descriptions: vec![],
op: Arc::new(TestSource {
support: true,
predicate: Some(conjunction(all_filters)),
statistics: self.statistics.clone(), // should be updated in reality
}),
revisit: false,
},
remaining_description: FilterDescription::empty(),
let new_node = Arc::new(TestSource {
support: true,
predicate: Some(conjunction(filters.clone())),
statistics: self.statistics.clone(), // should be updated in reality
});
Ok(FilterPushdownPropagation {
filters: PredicateSupports::all_supported(filters),
updated_node: Some(new_node),
})
} else {
Ok(filter_pushdown_not_supported(fd))
Ok(FilterPushdownPropagation::unsupported(filters))
}
}
}
Expand All @@ -201,7 +195,7 @@ fn test_pushdown_into_scan() {

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{}, true),
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
Expand All @@ -225,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() {
insta::assert_snapshot!(
OptimizationTest::new(
Arc::clone(&plan),
PushdownFilter {},
FilterPushdown {},
false
),
@r"
Expand All @@ -244,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() {
insta::assert_snapshot!(
OptimizationTest::new(
plan,
PushdownFilter {},
FilterPushdown {},
true
),
@r"
Expand All @@ -269,7 +263,7 @@ fn test_filter_collapse() {
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{}, true),
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
Expand All @@ -278,7 +272,7 @@ fn test_filter_collapse() {
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually looks like an improvement to me as now a = foo will be evaluated before b=bar as was done in the input plan. This might be important for short circuiting, perhaps

The prior version of this optimization seems to have reordered them

"
);
}
Expand All @@ -288,25 +282,28 @@ fn test_filter_with_projection() {
let scan = test_scan(true);
let projection = vec![1, 0];
let predicate = col_lit_predicate("a", "foo", schema());
let plan = Arc::new(
FilterExec::try_new(predicate, Arc::clone(&scan))
let filter = Arc::new(
FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan))
.unwrap()
.with_projection(Some(projection))
.unwrap(),
);
let predicate = col_lit_predicate("b", "bar", &filter.schema());
let plan = Arc::new(FilterExec::try_new(predicate, filter).unwrap());

// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{}, true),
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = foo, projection=[b@1, a@0]
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
- FilterExec: b@0 = bar
- FilterExec: a@0 = foo, projection=[b@1, a@0]
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
",
);

Expand All @@ -320,7 +317,7 @@ fn test_filter_with_projection() {
.unwrap(),
);
insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{},true),
OptimizationTest::new(plan, FilterPushdown{},true),
@r"
OptimizationTest:
input:
Expand Down Expand Up @@ -349,7 +346,7 @@ fn test_push_down_through_transparent_nodes() {

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{},true),
OptimizationTest::new(plan, FilterPushdown{},true),
@r"
OptimizationTest:
input:
Expand All @@ -362,7 +359,7 @@ fn test_push_down_through_transparent_nodes() {
Ok:
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0
- CoalesceBatchesExec: target_batch_size=1
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
"
);
}
Expand Down Expand Up @@ -413,7 +410,7 @@ fn test_no_pushdown_through_aggregates() {

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownFilter{}, true),
OptimizationTest::new(plan, FilterPushdown{}, true),
@r"
OptimizationTest:
input:
Expand Down
16 changes: 7 additions & 9 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use crate::file_stream::FileOpener;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::filter_pushdown::{
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -108,14 +106,14 @@ pub trait FileSource: Send + Sync {
}

/// Try to push down filters into this FileSource.
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
fn try_pushdown_filters(
&self,
fd: FilterDescription,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
Ok(filter_pushdown_not_supported(fd))
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
}
}
57 changes: 22 additions & 35 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{
expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
PhysicalSortExpr,
};
use datafusion_physical_plan::filter_pushdown::{
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
FilterPushdownSupport,
};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
metrics::ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -596,40 +594,29 @@ impl DataSource for FileScanConfig {

fn try_pushdown_filters(
&self,
fd: FilterDescription,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
let FilterPushdownResult {
support,
remaining_description,
} = self.file_source.try_pushdown_filters(fd, config)?;

match support {
FilterPushdownSupport::Supported {
child_descriptions,
op,
revisit,
} => {
let new_data_source = Arc::new(
FileScanConfigBuilder::from(self.clone())
.with_source(op)
.build(),
);

debug_assert!(child_descriptions.is_empty());
debug_assert!(!revisit);

Ok(FilterPushdownResult {
support: FilterPushdownSupport::Supported {
child_descriptions,
op: new_data_source,
revisit,
},
remaining_description,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
let result = self.file_source.try_pushdown_filters(filters, config)?;
match result.updated_node {
Some(new_file_source) => {
let file_scan_config = FileScanConfigBuilder::new(
self.object_store_url.clone(),
Arc::clone(&self.file_schema),
new_file_source,
)
.build();
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: Some(Arc::new(file_scan_config) as _),
})
}
FilterPushdownSupport::NotSupported => {
Ok(filter_pushdown_not_supported(remaining_description))
None => {
// If the file source does not support filter pushdown, return the original config
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: None,
})
}
}
}
Expand Down
65 changes: 29 additions & 36 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::filter_pushdown::{
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
FilterPushdownSupport,
ChildPushdownResult, FilterPushdownPropagation,
};

/// A source of data, typically a list of files or memory
Expand Down Expand Up @@ -95,13 +94,15 @@ pub trait DataSource: Send + Sync + Debug {
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
/// Try to push down filters into this DataSource.
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
fn try_pushdown_filters(
&self,
fd: FilterDescription,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
Ok(filter_pushdown_not_supported(fd))
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
}
}

Expand Down Expand Up @@ -237,39 +238,31 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.try_swapping_with_projection(projection)
}

fn try_pushdown_filters(
fn handle_child_pushdown_result(
&self,
fd: FilterDescription,
child_pushdown_result: ChildPushdownResult,
config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
let FilterPushdownResult {
support,
remaining_description,
} = self.data_source.try_pushdown_filters(fd, config)?;

match support {
FilterPushdownSupport::Supported {
child_descriptions,
op,
revisit,
} => {
let new_exec = Arc::new(DataSourceExec::new(op));

debug_assert!(child_descriptions.is_empty());
debug_assert!(!revisit);

Ok(FilterPushdownResult {
support: FilterPushdownSupport::Supported {
child_descriptions,
op: new_exec,
revisit,
},
remaining_description,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// Push any remaining filters into our data source
let res = self.data_source.try_pushdown_filters(
child_pushdown_result.parent_filters.collect_all(),
config,
)?;
match res.updated_node {
Some(data_source) => {
let mut new_node = self.clone();
new_node.data_source = data_source;
new_node.cache =
Self::compute_properties(Arc::clone(&new_node.data_source));
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
})
}
FilterPushdownSupport::NotSupported => {
Ok(filter_pushdown_not_supported(remaining_description))
}
None => Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: None,
}),
}
}
}
Expand Down
Loading