From 0071b1052ca1e44617223d3adaf69898a55ae735 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 00:11:07 -0700 Subject: [PATCH 01/19] refactor filter pushdown apis --- .../physical_optimizer/push_down_filter.rs | 33 +-- datafusion/datasource/src/file.rs | 12 +- datafusion/datasource/src/file_scan_config.rs | 93 +++--- datafusion/datasource/src/source.rs | 61 ++-- .../src/push_down_filter.rs | 265 ++++++++---------- .../physical-plan/src/coalesce_batches.rs | 22 +- .../physical-plan/src/execution_plan.rs | 50 ++-- datafusion/physical-plan/src/filter.rs | 150 +++++++--- .../physical-plan/src/filter_pushdown.rs | 227 +++++++++++---- .../physical-plan/src/repartition/mod.rs | 21 +- 10 files changed, 544 insertions(+), 390 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index b19144f1bcff..3d1e832b5a45 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -47,8 +47,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::push_down_filter::PushdownFilter; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + FilterPushdownPropagation, FilterPushdowns, }; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, @@ -154,29 +153,25 @@ impl FileSource for TestSource { fn try_pushdown_filters( &self, - mut fd: FilterDescription, + filters: &[Arc], config: &ConfigOptions, - ) -> Result>> { + ) -> Result>> { + let mut all_filters = filters.iter().map(Arc::clone).collect::>(); if self.support && config.execution.parquet.pushdown_filters { if let Some(internal) = self.predicate.as_ref() { - fd.filters.push(Arc::clone(internal)); + all_filters.push(Arc::clone(internal)); } - let all_filters = fd.take_description(); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions: vec![], - op: Arc::new(TestSource { - support: true, - predicate: Some(conjunction(all_filters)), - statistics: self.statistics.clone(), // should be updated in reality - }), - revisit: false, - }, - remaining_description: FilterDescription::empty(), + let new_node = Arc::new(TestSource { + support: true, + predicate: Some(conjunction(all_filters.clone())), + statistics: self.statistics.clone(), // should be updated in reality + }); + Ok(FilterPushdownPropagation { + parent_filter_result: FilterPushdowns::all_supported(&all_filters), + new_node: Some(new_node), }) } else { - Ok(filter_pushdown_not_supported(fd)) + Ok(FilterPushdownPropagation::unsupported(filters)) } } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 6bbe9e50d3b0..5d6eff9d1903 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -28,10 +28,8 @@ use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, Statistics}; -use datafusion_physical_expr::LexOrdering; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, -}; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -113,9 +111,9 @@ pub trait FileSource: Send + Sync { /// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: &[Arc], _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..8d0ef19d31c5 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -48,14 +48,12 @@ use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{ expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, }; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, -}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, @@ -596,43 +594,68 @@ impl DataSource for FileScanConfig { fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: &[Arc], config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.file_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_data_source = Arc::new( - FileScanConfigBuilder::from(self.clone()) - .with_source(op) - .build(), - ); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_data_source, - revisit, - }, - remaining_description, + ) -> Result>> { + let result = self.file_source.try_pushdown_filters(filters, config)?; + match result.new_node { + Some(new_node) => { + let mut new_data_source = self.clone(); + new_data_source.file_source = new_node; + Ok(FilterPushdownPropagation { + parent_filter_result: result.parent_filter_result, + new_node: Some(Arc::new(new_data_source) as _), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) + None => { + // If the file source does not support filter pushdown, return the original config + Ok(FilterPushdownPropagation { + parent_filter_result: result.parent_filter_result, + new_node: None, + }) } } } + + // fn try_pushdown_filters( + // &self, + // parent_filters: &[Arc], + // config: &ConfigOptions, + // ) -> Result>> { + // // let FilterPushdownResult { + // // support, + // // remaining_description, + // // } = self.file_source.try_pushdown_filters(fd, config)?; + + // // match support { + // // FilterPushdownSupport::Supported { + // // child_descriptions, + // // op, + // // revisit, + // // } => { + // // let new_data_source = Arc::new( + // // FileScanConfigBuilder::from(self.clone()) + // // .with_source(op) + // // .build(), + // // ); + + // // debug_assert!(child_descriptions.is_empty()); + // // debug_assert!(!revisit); + + // // Ok(FilterPushdownResult { + // // support: FilterPushdownSupport::Supported { + // // child_descriptions, + // // op: new_data_source, + // // revisit, + // // }, + // // remaining_description, + // // }) + // // } + // // FilterPushdownSupport::NotSupported => { + // // Ok(filter_pushdown_not_supported(remaining_description)) + // // } + // // } + // } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 022f77f2e421..622eabe6b6ce 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -33,11 +33,10 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + ChildPushdownResult, FilterPushdownPropagation, }; /// A source of data, typically a list of files or memory @@ -98,10 +97,10 @@ pub trait DataSource: Send + Sync + Debug { /// See [`ExecutionPlan::try_pushdown_filters`] for more details. fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: &[Arc], _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } @@ -237,39 +236,31 @@ impl ExecutionPlan for DataSourceExec { self.data_source.try_swapping_with_projection(projection) } - fn try_pushdown_filters( + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.data_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_exec = Arc::new(DataSourceExec::new(op)); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_exec, - revisit, - }, - remaining_description, + ) -> Result>> { + // Push any remaining filters into our data source + let res = self.data_source.try_pushdown_filters( + &child_pushdown_result.parent_filters.unpack(), + config, + )?; + match res.new_node { + Some(data_source) => { + let mut new_node = self.clone(); + new_node.data_source = data_source; + new_node.cache = + Self::compute_properties(Arc::clone(&new_node.data_source)); + Ok(FilterPushdownPropagation { + parent_filter_result: res.parent_filter_result, + new_node: Some(Arc::new(new_node)), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) - } + None => Ok(FilterPushdownPropagation { + parent_filter_result: res.parent_filter_result, + new_node: None, + }), } } } diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/push_down_filter.rs index 80201454d06d..6c73ade6f79b 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/push_down_filter.rs @@ -19,15 +19,13 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{config::ConfigOptions, Result}; -use datafusion_physical_expr::conjunction; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterPushdown, FilterPushdownPropagation, FilterPushdowns, }; -use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use itertools::izip; /// Attempts to recursively push given filters from the top of the tree into leafs. /// @@ -363,14 +361,123 @@ use datafusion_physical_plan::ExecutionPlan; /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] pub struct PushdownFilter {} - impl Default for PushdownFilter { fn default() -> Self { Self::new() } } -pub type FilterDescriptionContext = PlanContext; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterPushdownState { + NoChildren, + Unsupported, + Supported, +} + +fn push_down_filters( + node: &Arc, + parent_filters: Vec>, + config: &ConfigOptions, +) -> Result>> { + let pushdown_plan = node.gather_filters_for_pushdown(&parent_filters, config)?; + let children = node + .children() + .into_iter() + .map(Arc::clone) + .collect::>(); + let mut parent_pushdown_result = + vec![FilterPushdownState::NoChildren; parent_filters.len()]; + let mut self_filter_pushdown_result = vec![]; + let mut new_children = vec![]; + for (child, parent_filters, mut filters_for_child) in izip!( + children, + pushdown_plan.parent_filters_for_children, + pushdown_plan.self_filters_for_children + ) { + let mut parent_filter_indices = vec![]; + let num_self_filters = filters_for_child.len(); + for (idx, filter) in parent_filters.into_iter().enumerate() { + // Check if we can push this filter down to our children + match filter { + FilterPushdown::Supported(f) => { + // Queue this filter up for pushdown to this child + filters_for_child.push(f); + parent_filter_indices.push(idx); + // Mark this filter as supported by our children if no child has marked it as unsupported + if matches!( + parent_pushdown_result[idx], + FilterPushdownState::NoChildren + ) { + parent_pushdown_result[idx] = FilterPushdownState::Supported; + } + } + FilterPushdown::Unsupported(_) => { + // Mark as unsupported by our children + parent_pushdown_result[idx] = FilterPushdownState::Unsupported; + } + } + } + // Any filters that could not be pushed down to a child are marked as not-supported to our parents + let result = push_down_filters(&child, filters_for_child, config)?; + if let Some(new_child) = result.new_node { + // If we have a filter pushdown result, we need to update our children + new_children.push(new_child); + } else { + // If we don't have a filter pushdown result, we need to update our children + new_children.push(child); + } + // Our child doesn't know the difference between filters that were passed down from our parents + // and filters that the current node injected. + // We need to de-entangle this since we do need to distinguish between them. + let parent_filters = result.parent_filter_result.into_inner(); + let (self_filters, parent_filters) = parent_filters.split_at(num_self_filters); + self_filter_pushdown_result + .push(FilterPushdowns::new(self_filters.to_vec())); + for (idx, result) in parent_filter_indices.iter().zip(parent_filters) { + let current_node_state = match result { + FilterPushdown::Supported(_) => FilterPushdownState::Supported, + FilterPushdown::Unsupported(_) => FilterPushdownState::Unsupported, + }; + match (current_node_state, parent_pushdown_result[*idx]) { + (r, FilterPushdownState::NoChildren) => { + // If we have no result, use the current state from this child + parent_pushdown_result[*idx] = r; + } + (FilterPushdownState::Supported, FilterPushdownState::Supported) => { + // If the current child and all previous children are supported the filter continue to support it + parent_pushdown_result[*idx] = FilterPushdownState::Supported; + } + _ => { + // Either the current child or a previous child marked this filter as unsupported + parent_pushdown_result[*idx] = FilterPushdownState::Unsupported; + } + } + } + } + // Re-create this node with new children + let node = with_new_children_if_necessary(Arc::clone(node), new_children)?; + // Remap the result onto the parent filters as they were given to us. + // Any filters that were not pushed down to any children are marked as unsupported. + let parent_pushdown_result = FilterPushdowns::new( + parent_pushdown_result + .into_iter() + .zip(parent_filters) + .map(|(state, filter)| match state { + FilterPushdownState::NoChildren => FilterPushdown::Unsupported(filter), + FilterPushdownState::Unsupported => FilterPushdown::Unsupported(filter), + FilterPushdownState::Supported => FilterPushdown::Supported(filter), + }) + .collect(), + ); + // Check what the current node wants to do given the result of pushdown to it's children + Arc::clone(&node).handle_child_pushdown_result( + ChildPushdownResult { + parent_filters: parent_pushdown_result, + self_filters: self_filter_pushdown_result, + }, + config, + ) +} impl PhysicalOptimizerRule for PushdownFilter { fn optimize( @@ -378,31 +485,9 @@ impl PhysicalOptimizerRule for PushdownFilter { plan: Arc, config: &ConfigOptions, ) -> Result> { - let context = FilterDescriptionContext::new_default(plan); - - context - .transform_up(|node| { - if node.plan.as_any().downcast_ref::().is_some() { - let initial_plan = Arc::clone(&node.plan); - let mut accept_updated = false; - let updated_node = node.transform_down(|filter_node| { - Self::try_pushdown(filter_node, config, &mut accept_updated) - }); - - if accept_updated { - updated_node - } else { - Ok(Transformed::no(FilterDescriptionContext::new_default( - initial_plan, - ))) - } - } - // Other filter introducing operators extends here - else { - Ok(Transformed::no(node)) - } - }) - .map(|updated| updated.data.plan) + Ok(push_down_filters(&plan, vec![], config)? + .new_node + .unwrap_or(Arc::clone(&plan))) } fn name(&self) -> &str { @@ -418,118 +503,4 @@ impl PushdownFilter { pub fn new() -> Self { Self {} } - - fn try_pushdown( - mut node: FilterDescriptionContext, - config: &ConfigOptions, - accept_updated: &mut bool, - ) -> Result> { - let initial_description = FilterDescription { - filters: node.data.take_description(), - }; - - let FilterPushdownResult { - support, - remaining_description, - } = node - .plan - .try_pushdown_filters(initial_description, config)?; - - match support { - FilterPushdownSupport::Supported { - mut child_descriptions, - op, - revisit, - } => { - if revisit { - // This check handles cases where the current operator is entirely removed - // from the plan and replaced with its child. In such cases, to not skip - // over the new node, we need to explicitly re-apply this pushdown logic - // to the new node. - // - // TODO: If TreeNodeRecursion supports a Revisit mechanism in the future, - // this manual recursion could be removed. - - // If the operator is removed, it should not leave any filters as remaining - debug_assert!(remaining_description.filters.is_empty()); - // Operators having 2 children cannot be removed - debug_assert_eq!(child_descriptions.len(), 1); - debug_assert_eq!(node.children.len(), 1); - - node.plan = op; - node.data = child_descriptions.swap_remove(0); - node.children = node.children.swap_remove(0).children; - Self::try_pushdown(node, config, accept_updated) - } else { - if remaining_description.filters.is_empty() { - // Filter can be pushed down safely - node.plan = op; - if node.children.is_empty() { - *accept_updated = true; - } else { - for (child, descr) in - node.children.iter_mut().zip(child_descriptions) - { - child.data = descr; - } - } - } else { - // Filter cannot be pushed down - node = insert_filter_exec( - node, - child_descriptions, - remaining_description, - )?; - } - Ok(Transformed::yes(node)) - } - } - FilterPushdownSupport::NotSupported => { - if remaining_description.filters.is_empty() { - Ok(Transformed { - data: node, - transformed: false, - tnr: TreeNodeRecursion::Stop, - }) - } else { - node = insert_filter_exec( - node, - vec![FilterDescription::empty(); 1], - remaining_description, - )?; - Ok(Transformed { - data: node, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }) - } - } - } - } -} - -fn insert_filter_exec( - node: FilterDescriptionContext, - mut child_descriptions: Vec, - remaining_description: FilterDescription, -) -> Result { - let mut new_child_node = node; - - // Filter has one child - if !child_descriptions.is_empty() { - debug_assert_eq!(child_descriptions.len(), 1); - new_child_node.data = child_descriptions.swap_remove(0); - } - let new_plan = Arc::new(FilterExec::try_new( - conjunction(remaining_description.filters), - Arc::clone(&new_child_node.plan), - )?); - let new_children = vec![new_child_node]; - let new_data = FilterDescription::empty(); - - Ok(FilterDescriptionContext::new( - new_plan, - new_data, - new_children, - )) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 34b3f1b0241b..4f0476e8d772 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -32,11 +32,12 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, }; use datafusion_common::config::ConfigOptions; use futures::ready; @@ -226,14 +227,21 @@ impl ExecutionPlan for CoalesceBatchesExec { CardinalityEffect::Equal } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: &[Arc], _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterPushdownPlan::all_supported(parent_filters, 1)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 9551c2b1743e..58428b49e658 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,7 +17,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -491,39 +491,25 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } - /// Attempts to recursively push given filters from the top of the tree into leafs. - /// - /// This is used for various optimizations, such as: - /// - /// * Pushing down filters into scans in general to minimize the amount of data that needs to be materialzied. - /// * Pushing down dynamic filters from operators like TopK and Joins into scans. - /// - /// Generally the further down (closer to leaf nodes) that filters can be pushed, the better. - /// - /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND b = 2`. - /// With no filter pushdown the scan needs to read and materialize all the data from `t` and then filter based on `a` and `b`. - /// With filter pushdown into the scan it can first read only `a`, then `b` and keep track of - /// which rows match the filter. - /// Then only for rows that match the filter does it have to materialize the rest of the columns. - /// - /// # Default Implementation - /// - /// The default implementation assumes: - /// * Parent filters can't be passed onto children. - /// * This node has no filters to contribute. - /// - /// # Implementation Notes - /// - /// Most of the actual logic is implemented as a Physical Optimizer rule. - /// See [`PushdownFilter`] for more details. - /// - /// [`PushdownFilter`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html - fn try_pushdown_filters( + fn gather_filters_for_pushdown( + &self, + parent_filters: &[Arc], + _config: &ConfigOptions, + ) -> Result { + Ok(FilterPushdownPlan::all_unsupported( + parent_filters, + self.children().len(), + )) + } + + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, + )) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6df3e236a0dd..c2a622ac08c9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -27,7 +28,7 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterPushdown, FilterPushdownPlan, FilterPushdownPropagation, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -44,18 +45,19 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, + ConstExpr, ExprBoundaries, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -448,55 +450,113 @@ impl ExecutionPlan for FilterExec { try_embed_projection(projection, self) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - mut fd: FilterDescription, + parent_filters: &[Arc], _config: &ConfigOptions, - ) -> Result>> { - // Extend the filter descriptions - fd.filters.push(Arc::clone(&self.predicate)); - - // Extract the information - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription { filters: vec![] }; - let filter_input = Arc::clone(self.input()); - + ) -> Result { if let Some(projection_indices) = self.projection.as_ref() { - // Push the filters down, but leave a ProjectionExec behind, instead of the FilterExec - let filter_child_schema = filter_input.schema(); - let proj_exprs = projection_indices + // We need to invert the projection on any referenced columns in the filter + // Create a mapping from the output columns to the input columns (the inverse of the projection) + let inverse_projection = projection_indices .iter() - .map(|p| { - let field = filter_child_schema.field(*p).clone(); - ( - Arc::new(Column::new(field.name(), *p)) as Arc, - field.name().to_string(), - ) + .enumerate() + .map(|(i, &p)| (p, i)) + .collect::>(); + let predicate = Arc::clone(&self.predicate) + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = col.index(); + let index_in_input_schema = + inverse_projection.get(&index).ok_or_else(|| { + DataFusionError::Internal(format!( + "Column {} not found in projection", + index + )) + })?; + Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + *index_in_input_schema, + )) as _)) + } else { + Ok(Transformed::no(expr)) + } }) - .collect::>(); - let projection_exec = - Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) as _; - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: projection_exec, - revisit: false, - }, - remaining_description, - }) + .data()?; + Ok(FilterPushdownPlan::all_supported(parent_filters, 1) + .with_self_filters_for_children(vec![vec![predicate]])) } else { - // Pull out the FilterExec, and inform the rule as it should be re-run - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: filter_input, - revisit: true, - }, - remaining_description, - }) + Ok(FilterPushdownPlan::all_supported(parent_filters, 1) + .with_self_filters_for_children(vec![vec![Arc::clone(&self.predicate)]])) } } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // We absorb any parent filters that were not handled by our children + let mut unhandled_filters = + child_pushdown_result.parent_filters.keep_unsupported(); + // Was our own predicate handled by our children? + assert_eq!( + child_pushdown_result.self_filters.len(), + 1, + "FilterExec should only have one child" + ); + println!("child_pushdown_result: {:?}", child_pushdown_result); + let child_filter = child_pushdown_result + .self_filters + .into_iter() + .next() + .expect("FilterExec should only have one child") + .into_iter() + .next() + .expect("FilterExec produces only one filter for each child"); + if let FilterPushdown::Unsupported(expr) = child_filter { + unhandled_filters.push(expr); + } + // If we have unhandled filters, we need to create a new FilterExec + let filter_input = Arc::clone(self.input()); + let new_predicate = conjunction(unhandled_filters); + let new_exec = if new_predicate.eq(&lit(true)) { + // FilterExec is no longer needed, but we may need to leave a projection in place + match self.projection() { + Some(projection_indices) => { + let filter_child_schema = filter_input.schema(); + let proj_exprs = projection_indices + .iter() + .map(|p| { + let field = filter_child_schema.field(*p).clone(); + ( + Arc::new(Column::new(field.name(), *p)) + as Arc, + field.name().to_string(), + ) + }) + .collect::>(); + Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) + as Arc + } + None => { + // No projection needed, just return the input + filter_input + } + } + } else { + // Create a new FilterExec with the new predicate + Arc::new( + FilterExec::try_new(new_predicate, filter_input)? + .with_default_selectivity(self.default_selectivity())? + .with_projection(self.projection().cloned())?, + ) + }; + Ok(FilterPushdownPropagation { + parent_filter_result: child_pushdown_result.parent_filters.as_supported(), + new_node: Some(new_exec), + }) + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 38f5aef5923e..7330dc5cc76a 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -17,79 +17,194 @@ use std::sync::Arc; -use crate::ExecutionPlan; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -#[derive(Clone, Debug)] -pub struct FilterDescription { - /// Expressions coming from the parent nodes - pub filters: Vec>, +#[derive(Debug, Clone)] +pub enum FilterPushdown { + Supported(Arc), + Unsupported(Arc), } -impl Default for FilterDescription { - fn default() -> Self { - Self::empty() - } +#[derive(Debug, Clone)] +pub struct FilterPushdowns { + pub pushdowns: Vec, } -impl FilterDescription { - /// Takes the filters out of the struct, leaving an empty vector in its place. - pub fn take_description(&mut self) -> Vec> { - std::mem::take(&mut self.filters) +impl FilterPushdowns { + pub fn new(pushdowns: Vec) -> Self { + Self { pushdowns } + } + + pub fn all_supported(filters: &[Arc]) -> Self { + let pushdowns = filters + .iter() + .map(|f| FilterPushdown::Supported(Arc::clone(f))) + .collect(); + Self { pushdowns } + } + + pub fn all_unsupported(filters: &[Arc]) -> Self { + let pushdowns = filters + .iter() + .map(|f| FilterPushdown::Unsupported(Arc::clone(f))) + .collect(); + Self { pushdowns } + } + + /// Transform all filters to supported + pub fn as_supported(&self) -> Self { + let pushdowns = self + .pushdowns + .iter() + .map(|f| match f { + FilterPushdown::Supported(expr) => { + FilterPushdown::Supported(Arc::clone(expr)) + } + FilterPushdown::Unsupported(expr) => { + FilterPushdown::Supported(Arc::clone(expr)) + } + }) + .collect(); + Self { pushdowns } + } + + pub fn keep_supported(&self) -> Vec> { + self.pushdowns + .iter() + .filter_map(|f| match f { + FilterPushdown::Supported(expr) => Some(Arc::clone(expr)), + FilterPushdown::Unsupported(_) => None, + }) + .collect() + } + + pub fn keep_unsupported(&self) -> Vec> { + self.pushdowns + .iter() + .filter_map(|f| match f { + FilterPushdown::Unsupported(expr) => Some(Arc::clone(expr)), + FilterPushdown::Supported(_) => None, + }) + .collect() + } + + pub fn unpack(&self) -> Vec> { + self.pushdowns + .iter() + .map(|f| match f { + FilterPushdown::Supported(expr) => Arc::clone(expr), + FilterPushdown::Unsupported(expr) => Arc::clone(expr), + }) + .collect() + } + + pub fn into_inner(&self) -> Vec { + self.pushdowns.clone() + } + + pub fn supported( + filters: &[Arc], + mut f: impl FnMut(Arc) -> bool, + ) -> Self { + let pushdowns = filters + .iter() + .filter(|filt| f(Arc::clone(filt))) + .map(|f| FilterPushdown::Supported(Arc::clone(f))) + .collect(); + Self { pushdowns } } - pub fn empty() -> FilterDescription { - Self { filters: vec![] } + pub fn iter(&self) -> impl Iterator { + self.pushdowns.iter() + } + + pub fn len(&self) -> usize { + self.pushdowns.len() + } + + pub fn is_empty(&self) -> bool { + self.pushdowns.is_empty() + } +} + +impl IntoIterator for FilterPushdowns { + type Item = FilterPushdown; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.pushdowns.into_iter() } } -#[derive(Debug)] -pub enum FilterPushdownSupport { - Supported { - // Filter predicates which can be pushed down through the operator. - // NOTE that these are not placed into any operator. - child_descriptions: Vec, - // Possibly updated new operator - op: T, - // Whether the node is removed from the plan and the rule should be re-run manually - // on the new node. - // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag can be removed - revisit: bool, - }, - NotSupported, +#[derive(Debug, Clone)] +pub struct ChildPushdownResult { + /// The combined result of pushing down each parent filter into each child. + pub parent_filters: FilterPushdowns, + /// The result of pushing down each filter this node provided into each of it's children. + pub self_filters: Vec, } -#[derive(Debug)] -pub struct FilterPushdownResult { - pub support: FilterPushdownSupport, - // Filters which cannot be pushed down through the operator. - // NOTE that caller of try_pushdown_filters() should handle these remanining predicates, - // possibly introducing a FilterExec on top of this operator. - pub remaining_description: FilterDescription, +/// The result of pushing down filters into a node that it returns to its parent. +#[derive(Debug, Clone)] +pub struct FilterPushdownPropagation { + pub parent_filter_result: FilterPushdowns, + pub new_node: Option, } -pub fn filter_pushdown_not_supported( - remaining_description: FilterDescription, -) -> FilterPushdownResult { - FilterPushdownResult { - support: FilterPushdownSupport::NotSupported, - remaining_description, +impl FilterPushdownPropagation { + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that echoes back up to the parent the result of pushing down the filters + /// into the children. + pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { + Self { + parent_filter_result: child_pushdown_result.parent_filters, + new_node: None, + } } + + pub fn unsupported(parent_filters: &[Arc]) -> Self { + let unsupported = FilterPushdowns::all_unsupported(parent_filters); + Self { + parent_filter_result: unsupported, + new_node: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct FilterPushdownPlan { + pub parent_filters_for_children: Vec, + pub self_filters_for_children: Vec>>, } -pub fn filter_pushdown_transparent( - plan: Arc, - fd: FilterDescription, -) -> FilterPushdownResult> { - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription::empty(); - - FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: plan, - revisit: false, - }, - remaining_description, +impl FilterPushdownPlan { + pub fn all_unsupported( + parent_filters: &[Arc], + num_children: usize, + ) -> Self { + let unsupported = FilterPushdowns::all_unsupported(parent_filters); + Self { + parent_filters_for_children: vec![unsupported; num_children], + self_filters_for_children: vec![vec![]; num_children], + } + } + + pub fn all_supported( + parent_filters: &[Arc], + num_children: usize, + ) -> Self { + let supported = FilterPushdowns::all_supported(parent_filters); + Self { + parent_filters_for_children: vec![supported; num_children], + self_filters_for_children: vec![vec![]; num_children], + } + } + + pub fn with_self_filters_for_children( + mut self, + self_filters_for_children: Vec>>, + ) -> Self { + self.self_filters_for_children = self_filters_for_children; + self } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index f7c4f7477f12..9b4c3f7d65c4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -54,7 +54,7 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, }; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -743,14 +743,21 @@ impl ExecutionPlan for RepartitionExec { )?))) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: &[Arc], _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterPushdownPlan::all_supported(parent_filters, 1)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } From 3e2240233c05e666b40abd70dfbbbe038e59a558 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 00:12:28 -0700 Subject: [PATCH 02/19] remove commented out code --- datafusion/datasource/src/file_scan_config.rs | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 8d0ef19d31c5..38a1010ce14b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -616,46 +616,6 @@ impl DataSource for FileScanConfig { } } } - - // fn try_pushdown_filters( - // &self, - // parent_filters: &[Arc], - // config: &ConfigOptions, - // ) -> Result>> { - // // let FilterPushdownResult { - // // support, - // // remaining_description, - // // } = self.file_source.try_pushdown_filters(fd, config)?; - - // // match support { - // // FilterPushdownSupport::Supported { - // // child_descriptions, - // // op, - // // revisit, - // // } => { - // // let new_data_source = Arc::new( - // // FileScanConfigBuilder::from(self.clone()) - // // .with_source(op) - // // .build(), - // // ); - - // // debug_assert!(child_descriptions.is_empty()); - // // debug_assert!(!revisit); - - // // Ok(FilterPushdownResult { - // // support: FilterPushdownSupport::Supported { - // // child_descriptions, - // // op: new_data_source, - // // revisit, - // // }, - // // remaining_description, - // // }) - // // } - // // FilterPushdownSupport::NotSupported => { - // // Ok(filter_pushdown_not_supported(remaining_description)) - // // } - // // } - // } } impl FileScanConfig { From 145a313e4da51a7e5a9ccf6b8035a190d62b9605 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 10:58:09 -0700 Subject: [PATCH 03/19] fix tests --- .../physical_optimizer/push_down_filter.rs | 11 +++++------ .../physical-optimizer/src/push_down_filter.rs | 18 ++++++++++++------ datafusion/physical-plan/src/filter.rs | 1 - 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index 3d1e832b5a45..e198dacf2b3d 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -273,7 +273,7 @@ fn test_filter_collapse() { - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -301,7 +301,7 @@ fn test_filter_with_projection() { output: Ok: - ProjectionExec: expr=[b@1 as b, a@0 as a] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@1 = foo ", ); @@ -322,9 +322,8 @@ fn test_filter_with_projection() { - FilterExec: a@0 = foo, projection=[b@1] - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: - Ok: - - ProjectionExec: expr=[b@1 as b] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + Err: Internal error: Column 0 not found in projection. + This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker " ); } @@ -357,7 +356,7 @@ fn test_push_down_through_transparent_nodes() { Ok: - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0 - CoalesceBatchesExec: target_batch_size=1 - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/push_down_filter.rs index 6c73ade6f79b..129442e343c0 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/push_down_filter.rs @@ -431,8 +431,7 @@ fn push_down_filters( // We need to de-entangle this since we do need to distinguish between them. let parent_filters = result.parent_filter_result.into_inner(); let (self_filters, parent_filters) = parent_filters.split_at(num_self_filters); - self_filter_pushdown_result - .push(FilterPushdowns::new(self_filters.to_vec())); + self_filter_pushdown_result.push(FilterPushdowns::new(self_filters.to_vec())); for (idx, result) in parent_filter_indices.iter().zip(parent_filters) { let current_node_state = match result { FilterPushdown::Supported(_) => FilterPushdownState::Supported, @@ -455,7 +454,7 @@ fn push_down_filters( } } // Re-create this node with new children - let node = with_new_children_if_necessary(Arc::clone(node), new_children)?; + let new_node = with_new_children_if_necessary(Arc::clone(node), new_children)?; // Remap the result onto the parent filters as they were given to us. // Any filters that were not pushed down to any children are marked as unsupported. let parent_pushdown_result = FilterPushdowns::new( @@ -470,13 +469,20 @@ fn push_down_filters( .collect(), ); // Check what the current node wants to do given the result of pushdown to it's children - Arc::clone(&node).handle_child_pushdown_result( + let mut res = Arc::clone(&new_node).handle_child_pushdown_result( ChildPushdownResult { parent_filters: parent_pushdown_result, self_filters: self_filter_pushdown_result, }, config, - ) + )?; + // Compare pointers for new_node and node, if they are different we must replace ourselves because of + // changes in our children. + // So updated res. + if res.new_node.is_none() && !Arc::ptr_eq(&new_node, node) { + res.new_node = Some(new_node) + } + Ok(res) } impl PhysicalOptimizerRule for PushdownFilter { @@ -487,7 +493,7 @@ impl PhysicalOptimizerRule for PushdownFilter { ) -> Result> { Ok(push_down_filters(&plan, vec![], config)? .new_node - .unwrap_or(Arc::clone(&plan))) + .unwrap_or(plan)) } fn name(&self) -> &str { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c2a622ac08c9..4ad56aa8bea7 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -505,7 +505,6 @@ impl ExecutionPlan for FilterExec { 1, "FilterExec should only have one child" ); - println!("child_pushdown_result: {:?}", child_pushdown_result); let child_filter = child_pushdown_result .self_filters .into_iter() From ac463e99cf1166d3f9a03dbdeb7d083e746beacd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 11:04:36 -0700 Subject: [PATCH 04/19] fail to fix bug --- datafusion/core/tests/physical_optimizer/push_down_filter.rs | 5 +++-- datafusion/physical-plan/src/filter.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index e198dacf2b3d..fe257817446f 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -322,8 +322,9 @@ fn test_filter_with_projection() { - FilterExec: a@0 = foo, projection=[b@1] - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: - Err: Internal error: Column 0 not found in projection. - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker + Ok: + - ProjectionExec: expr=[b@1 as b] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@1 = foo " ); } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 4ad56aa8bea7..bd1ff7fe627f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -461,7 +461,7 @@ impl ExecutionPlan for FilterExec { let inverse_projection = projection_indices .iter() .enumerate() - .map(|(i, &p)| (p, i)) + .map(|(i, &p)| (i, p)) .collect::>(); let predicate = Arc::clone(&self.predicate) .transform_up(|expr| { From c0598948b7afb0403854f438f13f1ad4157af0fb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 13:49:27 -0700 Subject: [PATCH 05/19] fix --- .../physical_optimizer/push_down_filter.rs | 15 +++--- datafusion/physical-plan/src/filter.rs | 48 +++++++++++-------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index fe257817446f..00b5aa3e86a8 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -283,12 +283,14 @@ fn test_filter_with_projection() { let scan = test_scan(true); let projection = vec![1, 0]; let predicate = col_lit_predicate("a", "foo", schema()); - let plan = Arc::new( - FilterExec::try_new(predicate, Arc::clone(&scan)) + let filter = Arc::new( + FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan)) .unwrap() .with_projection(Some(projection)) .unwrap(), ); + let predicate = col_lit_predicate("b", "bar", &filter.schema()); + let plan = Arc::new(FilterExec::try_new(predicate, filter).unwrap()); // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec insta::assert_snapshot!( @@ -296,12 +298,13 @@ fn test_filter_with_projection() { @r" OptimizationTest: input: - - FilterExec: a@0 = foo, projection=[b@1, a@0] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + - FilterExec: b@0 = bar + - FilterExec: a@0 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - ProjectionExec: expr=[b@1 as b, a@0 as a] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@1 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar ", ); @@ -324,7 +327,7 @@ fn test_filter_with_projection() { output: Ok: - ProjectionExec: expr=[b@1 as b] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@1 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo " ); } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index bd1ff7fe627f..36269cc70bc5 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -461,30 +461,36 @@ impl ExecutionPlan for FilterExec { let inverse_projection = projection_indices .iter() .enumerate() - .map(|(i, &p)| (i, p)) + .map(|(i, &p)| (p, i)) .collect::>(); - let predicate = Arc::clone(&self.predicate) - .transform_up(|expr| { - if let Some(col) = expr.as_any().downcast_ref::() { - let index = col.index(); - let index_in_input_schema = - inverse_projection.get(&index).ok_or_else(|| { - DataFusionError::Internal(format!( - "Column {} not found in projection", - index + let parent_filters = parent_filters + .iter() + .map(|f| { + Arc::clone(f) + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = col.index(); + let index_in_input_schema = + inverse_projection.get(&index).ok_or_else(|| { + DataFusionError::Internal(format!( + "Column {} not found in projection", + index + )) + })?; + Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + *index_in_input_schema, )) - })?; - Ok(Transformed::yes(Arc::new(Column::new( - col.name(), - *index_in_input_schema, - )) as _)) - } else { - Ok(Transformed::no(expr)) - } + as _)) + } else { + Ok(Transformed::no(expr)) + } + }) + .data() }) - .data()?; - Ok(FilterPushdownPlan::all_supported(parent_filters, 1) - .with_self_filters_for_children(vec![vec![predicate]])) + .collect::>>()?; + Ok(FilterPushdownPlan::all_supported(&parent_filters, 1) + .with_self_filters_for_children(vec![vec![Arc::clone(&self.predicate)]])) } else { Ok(FilterPushdownPlan::all_supported(parent_filters, 1) .with_self_filters_for_children(vec![vec![Arc::clone(&self.predicate)]])) From 6c36992cb834a710280e9a462ae5154e5497937b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 17:12:23 -0700 Subject: [PATCH 06/19] add/fix docs --- datafusion/datasource/src/file.rs | 4 +-- datafusion/datasource/src/source.rs | 4 ++- .../src/push_down_filter.rs | 11 ++++---- .../physical-plan/src/execution_plan.rs | 28 +++++++++++++++++++ 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 5d6eff9d1903..4a6770a096df 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -106,9 +106,9 @@ pub trait FileSource: Send + Sync { } /// Try to push down filters into this FileSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// - /// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, filters: &[Arc], diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 622eabe6b6ce..289e4b4ed2a3 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -94,7 +94,9 @@ pub trait DataSource: Send + Sync + Debug { _projection: &ProjectionExec, ) -> Result>>; /// Try to push down filters into this DataSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. + /// + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, filters: &[Arc], diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/push_down_filter.rs index 129442e343c0..3e23ece044ef 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/push_down_filter.rs @@ -31,11 +31,12 @@ use itertools::izip; /// /// # Default Implementation /// -/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op -/// that assumes that: +/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`] +/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that: /// -/// * Parent filters can't be passed onto children. -/// * This node has no filters to contribute. +/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`]) +/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]). +/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]). /// /// # Example: Push filter into a `DataSourceExec` /// @@ -238,7 +239,7 @@ use itertools::izip; /// The point here is that: /// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node. /// Any filters above the [`AggregateExec`] node are not pushed down. -/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. +/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node. /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push /// down the `id=1` filter. /// diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 58428b49e658..21c8d8019f98 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -491,6 +491,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } + /// Collect filters that this node can push down to its children. + /// Filters that are being pushed down from parents are passed in, + /// and the node may generate additional filters to push down. + /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec, + /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`: + /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent + /// filters so it only returns that `FilterExec` wants to push down its own predicate. + /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from + /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key) + /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join). + /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec` + /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything + /// since it has no children and no additional filters to push down. + /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse + /// up the plan that `DataSourceExec` can actually bind the filters. + /// + /// The default implementation bars all parent filters from being pushed down and adds no new filters. + /// This is the safest option, making filter pushdown opt-in on a per-node pasis. fn gather_filters_for_pushdown( &self, parent_filters: &[Arc], @@ -502,6 +520,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { )) } + /// Handle the result of a child pushdown. + /// This is called as we recurse back up the plan tree after recursing down and calling [`ExecutionPlan::gather_filters_for_pushdown`]. + /// Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result. + /// For a `DataSourceExec` that may be absorbing the filters to apply them during the scan phase + /// (also known as late materialization). + /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it + /// may remove itself from the plan altogether. + /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. + /// + /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. fn handle_child_pushdown_result( &self, child_pushdown_result: ChildPushdownResult, From 7d0c68cb075aa69abe79704eb792a11fd2ba195d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 22 Apr 2025 17:26:33 -0700 Subject: [PATCH 07/19] lint --- datafusion/physical-plan/src/execution_plan.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 21c8d8019f98..9dccd0eb7bd1 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -499,14 +499,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent /// filters so it only returns that `FilterExec` wants to push down its own predicate. /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from - /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key) - /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join). + /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key) + /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join). /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec` /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything /// since it has no children and no additional filters to push down. /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse /// up the plan that `DataSourceExec` can actually bind the filters. - /// + /// /// The default implementation bars all parent filters from being pushed down and adds no new filters. /// This is the safest option, making filter pushdown opt-in on a per-node pasis. fn gather_filters_for_pushdown( @@ -528,7 +528,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it /// may remove itself from the plan altogether. /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. - /// + /// /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. fn handle_child_pushdown_result( &self, From c765a2393ebd1366eda2516aeca109d94903fc65 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 24 Apr 2025 17:56:02 -0700 Subject: [PATCH 08/19] add some docstrings, some minimal cleaup --- datafusion/datasource/src/source.rs | 2 +- .../src/push_down_filter.rs | 2 +- datafusion/physical-plan/src/filter.rs | 7 +- .../physical-plan/src/filter_pushdown.rs | 109 ++++++++++-------- 4 files changed, 67 insertions(+), 53 deletions(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 289e4b4ed2a3..ffb9c22981cb 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -245,7 +245,7 @@ impl ExecutionPlan for DataSourceExec { ) -> Result>> { // Push any remaining filters into our data source let res = self.data_source.try_pushdown_filters( - &child_pushdown_result.parent_filters.unpack(), + &child_pushdown_result.parent_filters.into_inner_filters(), config, )?; match res.new_node { diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/push_down_filter.rs index 3e23ece044ef..e6a4b4b552bd 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/push_down_filter.rs @@ -397,7 +397,7 @@ fn push_down_filters( ) { let mut parent_filter_indices = vec![]; let num_self_filters = filters_for_child.len(); - for (idx, filter) in parent_filters.into_iter().enumerate() { + for (idx, filter) in parent_filters.into_inner().into_iter().enumerate() { // Check if we can push this filter down to our children match filter { FilterPushdown::Supported(f) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 36269cc70bc5..a76b5e68c585 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -504,7 +504,7 @@ impl ExecutionPlan for FilterExec { ) -> Result>> { // We absorb any parent filters that were not handled by our children let mut unhandled_filters = - child_pushdown_result.parent_filters.keep_unsupported(); + child_pushdown_result.parent_filters.collect_unsupported(); // Was our own predicate handled by our children? assert_eq!( child_pushdown_result.self_filters.len(), @@ -516,9 +516,10 @@ impl ExecutionPlan for FilterExec { .into_iter() .next() .expect("FilterExec should only have one child") - .into_iter() + .iter() .next() - .expect("FilterExec produces only one filter for each child"); + .expect("FilterExec produces only one filter for each child") + .clone(); if let FilterPushdown::Unsupported(expr) = child_filter { unhandled_filters.push(expr); } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 7330dc5cc76a..63f3a756a256 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -19,42 +19,51 @@ use std::sync::Arc; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +/// The result of or a plan for pushing down a filter into a child node. +/// This contains references to filters so that nodes can mutate a filter +/// before pushing it down to a child node (e.g. to adjust a projection) +/// or can directly take ownership of `Unsupported` filters that their children +/// could not handle. #[derive(Debug, Clone)] pub enum FilterPushdown { Supported(Arc), Unsupported(Arc), } +/// A thin wrapper around [`FilterPushdown`]s that allows for easy collection of +/// supported and unsupported filters. #[derive(Debug, Clone)] -pub struct FilterPushdowns { - pub pushdowns: Vec, -} +pub struct FilterPushdowns(Vec); impl FilterPushdowns { + /// Create a new FilterPushdowns with the given filters and their pushdown status. pub fn new(pushdowns: Vec) -> Self { - Self { pushdowns } + Self(pushdowns) } + /// Create a new FilterPushdowns with all filters as supported. pub fn all_supported(filters: &[Arc]) -> Self { let pushdowns = filters .iter() .map(|f| FilterPushdown::Supported(Arc::clone(f))) .collect(); - Self { pushdowns } + Self::new(pushdowns) } + /// Create a new FilterPushdowns with all filters as unsupported. pub fn all_unsupported(filters: &[Arc]) -> Self { let pushdowns = filters .iter() .map(|f| FilterPushdown::Unsupported(Arc::clone(f))) .collect(); - Self { pushdowns } + Self::new(pushdowns) } - /// Transform all filters to supported + /// Transform all filters to supported, returning a new FilterPushdowns. + /// This does not modify the original FilterPushdowns. pub fn as_supported(&self) -> Self { let pushdowns = self - .pushdowns + .0 .iter() .map(|f| match f { FilterPushdown::Supported(expr) => { @@ -65,21 +74,13 @@ impl FilterPushdowns { } }) .collect(); - Self { pushdowns } - } - - pub fn keep_supported(&self) -> Vec> { - self.pushdowns - .iter() - .filter_map(|f| match f { - FilterPushdown::Supported(expr) => Some(Arc::clone(expr)), - FilterPushdown::Unsupported(_) => None, - }) - .collect() + Self::new(pushdowns) } - pub fn keep_unsupported(&self) -> Vec> { - self.pushdowns + /// Collect unsupported filters into a Vec, without removing them from the original + /// FilterPushdowns. + pub fn collect_unsupported(&self) -> Vec> { + self.0 .iter() .filter_map(|f| match f { FilterPushdown::Unsupported(expr) => Some(Arc::clone(expr)), @@ -88,8 +89,10 @@ impl FilterPushdowns { .collect() } - pub fn unpack(&self) -> Vec> { - self.pushdowns + /// Collect all filters as PhysicalExprs into a Vec, without removing them from the original + /// FilterPushdowns. + pub fn into_inner_filters(&self) -> Vec> { + self.0 .iter() .map(|f| match f { FilterPushdown::Supported(expr) => Arc::clone(expr), @@ -98,53 +101,61 @@ impl FilterPushdowns { .collect() } + /// Return the inner `Vec` without modifying the original FilterPushdowns. pub fn into_inner(&self) -> Vec { - self.pushdowns.clone() - } - - pub fn supported( - filters: &[Arc], - mut f: impl FnMut(Arc) -> bool, - ) -> Self { - let pushdowns = filters - .iter() - .filter(|filt| f(Arc::clone(filt))) - .map(|f| FilterPushdown::Supported(Arc::clone(f))) - .collect(); - Self { pushdowns } + self.0.clone() } + /// Return an iterator over the inner `Vec`. pub fn iter(&self) -> impl Iterator { - self.pushdowns.iter() + self.0.iter() } + /// Return the number of filters in the inner `Vec`. pub fn len(&self) -> usize { - self.pushdowns.len() + self.0.len() } + /// Check if the inner `Vec` is empty. pub fn is_empty(&self) -> bool { - self.pushdowns.is_empty() - } -} - -impl IntoIterator for FilterPushdowns { - type Item = FilterPushdown; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.pushdowns.into_iter() + self.0.is_empty() } } +/// The result of pushing down filters into a child node. +/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. +/// Nodes process this result and convert it into a [`FilterPushdownPropagation`] +/// that is returned to their parent. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct ChildPushdownResult { /// The combined result of pushing down each parent filter into each child. + /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: + /// + // | filter | child 1 | child 2 | child 3 | result | + // |--------|-------------|-----------|-----------|-------------| + // | a | Supported | Supported | Supported | Supported | + // | b | Unsupported | Supported | Supported | Unsupported | + /// + /// That is: if any child marks a filter as unsupported or if the filter was not pushed + /// down into any child then the result is unsupported. + /// If at least one children and all children that received the filter mark it as supported + /// then the result is supported. pub parent_filters: FilterPushdowns, /// The result of pushing down each filter this node provided into each of it's children. + /// This is not combined with the parent filters so that nodes can treat each child independently. pub self_filters: Vec, } /// The result of pushing down filters into a node that it returns to its parent. +/// This is what nodes return from [`ExecutionPlan::handle_child_pushdown_result`] to communicate +/// to the optimizer: +/// +/// 1. What to do with any parent filters that were not completely handled by the children. +/// 2. If the node needs to be replaced in the execution plan with a new node or not. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { pub parent_filter_result: FilterPushdowns, @@ -162,6 +173,8 @@ impl FilterPushdownPropagation { } } + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that none of the parent filters were not pushed down. pub fn unsupported(parent_filters: &[Arc]) -> Self { let unsupported = FilterPushdowns::all_unsupported(parent_filters); Self { From 838c07101cc6306d6f39b95d200a9413884db855 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Sun, 27 Apr 2025 22:55:00 +0300 Subject: [PATCH 09/19] review suggestions --- .../physical_optimizer/push_down_filter.rs | 31 ++- datafusion/datasource/src/file.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 22 +- datafusion/datasource/src/source.rs | 14 +- ...push_down_filter.rs => filter_pushdown.rs} | 210 ++++++++++-------- datafusion/physical-optimizer/src/lib.rs | 2 +- .../physical-optimizer/src/optimizer.rs | 4 +- .../physical-plan/src/coalesce_batches.rs | 11 +- .../physical-plan/src/execution_plan.rs | 8 +- datafusion/physical-plan/src/filter.rs | 91 ++++---- .../physical-plan/src/filter_pushdown.rs | 132 +++++------ .../physical-plan/src/repartition/mod.rs | 11 +- 12 files changed, 298 insertions(+), 240 deletions(-) rename datafusion/physical-optimizer/src/{push_down_filter.rs => filter_pushdown.rs} (79%) diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index 00b5aa3e86a8..326a7b837e7a 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -44,10 +44,10 @@ use datafusion_physical_expr::{ aggregate::AggregateExprBuilder, conjunction, Partitioning, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; -use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, FilterPushdowns, + FilterPushdownPropagation, PredicateSupports, }; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, @@ -153,22 +153,21 @@ impl FileSource for TestSource { fn try_pushdown_filters( &self, - filters: &[Arc], + mut filters: Vec>, config: &ConfigOptions, ) -> Result>> { - let mut all_filters = filters.iter().map(Arc::clone).collect::>(); if self.support && config.execution.parquet.pushdown_filters { if let Some(internal) = self.predicate.as_ref() { - all_filters.push(Arc::clone(internal)); + filters.push(Arc::clone(internal)); } let new_node = Arc::new(TestSource { support: true, - predicate: Some(conjunction(all_filters.clone())), + predicate: Some(conjunction(filters.clone())), statistics: self.statistics.clone(), // should be updated in reality }); Ok(FilterPushdownPropagation { - parent_filter_result: FilterPushdowns::all_supported(&all_filters), - new_node: Some(new_node), + filters: PredicateSupports::all_supported(filters), + updated_node: Some(new_node), }) } else { Ok(FilterPushdownPropagation::unsupported(filters)) @@ -196,7 +195,7 @@ fn test_pushdown_into_scan() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -220,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( Arc::clone(&plan), - PushdownFilter {}, + FilterPushdown {}, false ), @r" @@ -239,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( plan, - PushdownFilter {}, + FilterPushdown {}, true ), @r" @@ -264,7 +263,7 @@ fn test_filter_collapse() { let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -294,7 +293,7 @@ fn test_filter_with_projection() { // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -318,7 +317,7 @@ fn test_filter_with_projection() { .unwrap(), ); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -347,7 +346,7 @@ fn test_push_down_through_transparent_nodes() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -411,7 +410,7 @@ fn test_no_pushdown_through_aggregates() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 4a6770a096df..c9b5c416f0c0 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -111,7 +111,7 @@ pub trait FileSource: Send + Sync { /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - filters: &[Arc], + filters: Vec>, _config: &ConfigOptions, ) -> Result>> { Ok(FilterPushdownPropagation::unsupported(filters)) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 38a1010ce14b..ae94af5a7b26 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -594,24 +594,28 @@ impl DataSource for FileScanConfig { fn try_pushdown_filters( &self, - filters: &[Arc], + filters: Vec>, config: &ConfigOptions, ) -> Result>> { let result = self.file_source.try_pushdown_filters(filters, config)?; - match result.new_node { - Some(new_node) => { - let mut new_data_source = self.clone(); - new_data_source.file_source = new_node; + match result.updated_node { + Some(new_file_source) => { + let file_scan_config = FileScanConfigBuilder::new( + self.object_store_url.clone(), + Arc::clone(&self.file_schema), + new_file_source, + ) + .build(); Ok(FilterPushdownPropagation { - parent_filter_result: result.parent_filter_result, - new_node: Some(Arc::new(new_data_source) as _), + filters: result.filters, + updated_node: Some(Arc::new(file_scan_config) as _), }) } None => { // If the file source does not support filter pushdown, return the original config Ok(FilterPushdownPropagation { - parent_filter_result: result.parent_filter_result, - new_node: None, + filters: result.filters, + updated_node: None, }) } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index ffb9c22981cb..cf42347e3aba 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -99,7 +99,7 @@ pub trait DataSource: Send + Sync + Debug { /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - filters: &[Arc], + filters: Vec>, _config: &ConfigOptions, ) -> Result>> { Ok(FilterPushdownPropagation::unsupported(filters)) @@ -245,23 +245,23 @@ impl ExecutionPlan for DataSourceExec { ) -> Result>> { // Push any remaining filters into our data source let res = self.data_source.try_pushdown_filters( - &child_pushdown_result.parent_filters.into_inner_filters(), + child_pushdown_result.parent_filters.collect_all(), config, )?; - match res.new_node { + match res.updated_node { Some(data_source) => { let mut new_node = self.clone(); new_node.data_source = data_source; new_node.cache = Self::compute_properties(Arc::clone(&new_node.data_source)); Ok(FilterPushdownPropagation { - parent_filter_result: res.parent_filter_result, - new_node: Some(Arc::new(new_node)), + filters: res.filters, + updated_node: Some(Arc::new(new_node)), }) } None => Ok(FilterPushdownPropagation { - parent_filter_result: res.parent_filter_result, - new_node: None, + filters: res.filters, + updated_node: None, }), } } diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs similarity index 79% rename from datafusion/physical-optimizer/src/push_down_filter.rs rename to datafusion/physical-optimizer/src/filter_pushdown.rs index e6a4b4b552bd..ca4c5920a224 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -22,9 +22,10 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdown, FilterPushdownPropagation, FilterPushdowns, + ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, PredicateSupports, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; + use itertools::izip; /// Attempts to recursively push given filters from the top of the tree into leafs. @@ -361,153 +362,180 @@ use itertools::izip; /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] -pub struct PushdownFilter {} -impl Default for PushdownFilter { +pub struct FilterPushdown {} + +impl FilterPushdown { + pub fn new() -> Self { + Self {} + } +} + +impl Default for FilterPushdown { fn default() -> Self { Self::new() } } +impl PhysicalOptimizerRule for FilterPushdown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + Ok(push_down_filters(Arc::clone(&plan), vec![], config)? + .updated_node + .unwrap_or(plan)) + } + + fn name(&self) -> &str { + "PushdownFilter" + } + + fn schema_check(&self) -> bool { + true // Filter pushdown does not change the schema of the plan + } +} + +/// Support state of each predicate for the children of the node. +/// These predicates are coming from the parent node. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum FilterPushdownState { +enum ParentPredicateStates { NoChildren, Unsupported, Supported, } fn push_down_filters( - node: &Arc, - parent_filters: Vec>, + node: Arc, + parent_predicates: Vec>, config: &ConfigOptions, ) -> Result>> { - let pushdown_plan = node.gather_filters_for_pushdown(&parent_filters, config)?; - let children = node - .children() - .into_iter() - .map(Arc::clone) - .collect::>(); - let mut parent_pushdown_result = - vec![FilterPushdownState::NoChildren; parent_filters.len()]; - let mut self_filter_pushdown_result = vec![]; + // If the node has any child, these will be rewritten as supported or unsupported + let mut parent_predicates_pushdown_states = + vec![ParentPredicateStates::NoChildren; parent_predicates.len()]; + let mut self_filters_pushdown_supports = vec![]; let mut new_children = vec![]; - for (child, parent_filters, mut filters_for_child) in izip!( + + let children = node.children(); + let filter_description = + node.gather_filters_for_pushdown(parent_predicates.clone(), config)?; + + for (child, parent_filters, self_filters) in izip!( children, - pushdown_plan.parent_filters_for_children, - pushdown_plan.self_filters_for_children + filter_description.parent_filters, + filter_description.self_filters ) { - let mut parent_filter_indices = vec![]; - let num_self_filters = filters_for_child.len(); - for (idx, filter) in parent_filters.into_inner().into_iter().enumerate() { - // Check if we can push this filter down to our children + // Here, `parent_filters` are the predicates which are provided by the parent node of + // the current node, and tried to be pushed down over the child which the loop points + // currently. `self_filters` are the predicates which are provided by the current node, + // and tried to be pushed down over the child similarly. + + let num_self_filters = self_filters.len(); + let mut parent_supported_predicate_indices = vec![]; + let mut all_predicates = self_filters; + + // Iterate over each predicate coming from the parent + for (idx, filter) in parent_filters.into_iter().enumerate() { + // Check if we can push this filter down to our child. + // These supports are defined in `gather_filters_for_pushdown()` match filter { - FilterPushdown::Supported(f) => { + PredicateSupport::Supported(predicate) => { // Queue this filter up for pushdown to this child - filters_for_child.push(f); - parent_filter_indices.push(idx); + all_predicates.push(predicate); + parent_supported_predicate_indices.push(idx); // Mark this filter as supported by our children if no child has marked it as unsupported - if matches!( - parent_pushdown_result[idx], - FilterPushdownState::NoChildren - ) { - parent_pushdown_result[idx] = FilterPushdownState::Supported; + if parent_predicates_pushdown_states[idx] + != ParentPredicateStates::Unsupported + { + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Supported; } } - FilterPushdown::Unsupported(_) => { + PredicateSupport::Unsupported(_) => { // Mark as unsupported by our children - parent_pushdown_result[idx] = FilterPushdownState::Unsupported; + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Unsupported; } } } + // Any filters that could not be pushed down to a child are marked as not-supported to our parents - let result = push_down_filters(&child, filters_for_child, config)?; - if let Some(new_child) = result.new_node { + let result = push_down_filters(Arc::clone(child), all_predicates, config)?; + + if let Some(new_child) = result.updated_node { // If we have a filter pushdown result, we need to update our children new_children.push(new_child); } else { // If we don't have a filter pushdown result, we need to update our children - new_children.push(child); + new_children.push(Arc::clone(child)); } - // Our child doesn't know the difference between filters that were passed down from our parents - // and filters that the current node injected. - // We need to de-entangle this since we do need to distinguish between them. - let parent_filters = result.parent_filter_result.into_inner(); - let (self_filters, parent_filters) = parent_filters.split_at(num_self_filters); - self_filter_pushdown_result.push(FilterPushdowns::new(self_filters.to_vec())); - for (idx, result) in parent_filter_indices.iter().zip(parent_filters) { + + // Our child doesn't know the difference between filters that were passed down + // from our parents and filters that the current node injected. We need to de-entangle + // this since we do need to distinguish between them. + let mut all_filters = result.filters.into_inner(); + let parent_predicates = all_filters.split_off(num_self_filters); + let self_predicates = all_filters; + self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates)); + + for (idx, result) in parent_supported_predicate_indices + .iter() + .zip(parent_predicates) + { let current_node_state = match result { - FilterPushdown::Supported(_) => FilterPushdownState::Supported, - FilterPushdown::Unsupported(_) => FilterPushdownState::Unsupported, + PredicateSupport::Supported(_) => ParentPredicateStates::Supported, + PredicateSupport::Unsupported(_) => ParentPredicateStates::Unsupported, }; - match (current_node_state, parent_pushdown_result[*idx]) { - (r, FilterPushdownState::NoChildren) => { + match (current_node_state, parent_predicates_pushdown_states[*idx]) { + (r, ParentPredicateStates::NoChildren) => { // If we have no result, use the current state from this child - parent_pushdown_result[*idx] = r; + parent_predicates_pushdown_states[*idx] = r; } - (FilterPushdownState::Supported, FilterPushdownState::Supported) => { - // If the current child and all previous children are supported the filter continue to support it - parent_pushdown_result[*idx] = FilterPushdownState::Supported; + (ParentPredicateStates::Supported, ParentPredicateStates::Supported) => { + // If the current child and all previous children are supported, + // the filter continues to support it + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Supported; } _ => { // Either the current child or a previous child marked this filter as unsupported - parent_pushdown_result[*idx] = FilterPushdownState::Unsupported; + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Unsupported; } } } } // Re-create this node with new children - let new_node = with_new_children_if_necessary(Arc::clone(node), new_children)?; + let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; // Remap the result onto the parent filters as they were given to us. // Any filters that were not pushed down to any children are marked as unsupported. - let parent_pushdown_result = FilterPushdowns::new( - parent_pushdown_result + let parent_pushdown_result = PredicateSupports::new( + parent_predicates_pushdown_states .into_iter() - .zip(parent_filters) + .zip(parent_predicates) .map(|(state, filter)| match state { - FilterPushdownState::NoChildren => FilterPushdown::Unsupported(filter), - FilterPushdownState::Unsupported => FilterPushdown::Unsupported(filter), - FilterPushdownState::Supported => FilterPushdown::Supported(filter), + ParentPredicateStates::NoChildren => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Unsupported => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Supported => PredicateSupport::Supported(filter), }) .collect(), ); // Check what the current node wants to do given the result of pushdown to it's children - let mut res = Arc::clone(&new_node).handle_child_pushdown_result( + let mut res = updated_node.handle_child_pushdown_result( ChildPushdownResult { parent_filters: parent_pushdown_result, - self_filters: self_filter_pushdown_result, + self_filters: self_filters_pushdown_supports, }, config, )?; - // Compare pointers for new_node and node, if they are different we must replace ourselves because of - // changes in our children. - // So updated res. - if res.new_node.is_none() && !Arc::ptr_eq(&new_node, node) { - res.new_node = Some(new_node) + // Compare pointers for new_node and node, if they are different we must replace + // ourselves because of changes in our children. + if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) { + res.updated_node = Some(updated_node) } Ok(res) } - -impl PhysicalOptimizerRule for PushdownFilter { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - Ok(push_down_filters(&plan, vec![], config)? - .new_node - .unwrap_or(plan)) - } - - fn name(&self) -> &str { - "PushdownFilter" - } - - fn schema_check(&self) -> bool { - true // Filter pushdown does not change the schema of the plan - } -} - -impl PushdownFilter { - pub fn new() -> Self { - Self {} - } -} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 57dac21b6eee..5a43d7118d63 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,6 +29,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod filter_pushdown; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; @@ -36,7 +37,6 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; -pub mod push_down_filter; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d4ff7d6b9e15..78d3e2ad8873 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -25,12 +25,12 @@ use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; +use crate::filter_pushdown::FilterPushdown; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::output_requirements::OutputRequirements; use crate::projection_pushdown::ProjectionPushdown; -use crate::push_down_filter::PushdownFilter; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; @@ -125,7 +125,7 @@ impl PhysicalOptimizer { // The FilterPushdown rule tries to push down filters as far as it can. // For example, it will push down filtering from a `FilterExec` to // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - Arc::new(PushdownFilter::new()), + Arc::new(FilterPushdown::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 4f0476e8d772..c4bc0dae15a1 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -37,7 +37,7 @@ use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use datafusion_common::config::ConfigOptions; use futures::ready; @@ -229,10 +229,13 @@ impl ExecutionPlan for CoalesceBatchesExec { fn gather_filters_for_pushdown( &self, - parent_filters: &[Arc], + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result { - Ok(FilterPushdownPlan::all_supported(parent_filters, 1)) + ) -> Result { + Ok(FilterDescription::all_supported_from_parent( + parent_filters, + 1, + )) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 9dccd0eb7bd1..b53162120322 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,7 +17,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -511,10 +511,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// This is the safest option, making filter pushdown opt-in on a per-node pasis. fn gather_filters_for_pushdown( &self, - parent_filters: &[Arc], + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result { - Ok(FilterPushdownPlan::all_unsupported( + ) -> Result { + Ok(FilterDescription::all_unsupported_from_parent( parent_filters, self.children().len(), )) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a76b5e68c585..277b3bd731e6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -28,7 +28,7 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterPushdown, FilterPushdownPlan, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, PredicateSupport, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -45,7 +45,9 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; @@ -64,6 +66,8 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; +const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug, Clone)] @@ -90,7 +94,7 @@ impl FilterExec { ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { - let default_selectivity = 20; + let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY; let cache = Self::compute_properties( &input, &predicate, @@ -452,10 +456,12 @@ impl ExecutionPlan for FilterExec { fn gather_filters_for_pushdown( &self, - parent_filters: &[Arc], + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result { - if let Some(projection_indices) = self.projection.as_ref() { + ) -> Result { + let self_filter = Arc::clone(&self.predicate); + + let parent_filters = if let Some(projection_indices) = self.projection.as_ref() { // We need to invert the projection on any referenced columns in the filter // Create a mapping from the output columns to the input columns (the inverse of the projection) let inverse_projection = projection_indices @@ -463,11 +469,11 @@ impl ExecutionPlan for FilterExec { .enumerate() .map(|(i, &p)| (p, i)) .collect::>(); - let parent_filters = parent_filters - .iter() + parent_filters + .into_iter() .map(|f| { - Arc::clone(f) - .transform_up(|expr| { + f.transform_up(|expr| { + let mut res = if let Some(col) = expr.as_any().downcast_ref::() { let index = col.index(); let index_in_input_schema = @@ -477,52 +483,59 @@ impl ExecutionPlan for FilterExec { index )) })?; - Ok(Transformed::yes(Arc::new(Column::new( + Transformed::yes(Arc::new(Column::new( col.name(), *index_in_input_schema, - )) - as _)) + )) as _) } else { - Ok(Transformed::no(expr)) - } - }) - .data() + Transformed::no(expr) + }; + // Columns can only exist in the leaves, no need to try all nodes + res.tnr = TreeNodeRecursion::Jump; + Ok(res) + }) + .data() }) - .collect::>>()?; - Ok(FilterPushdownPlan::all_supported(&parent_filters, 1) - .with_self_filters_for_children(vec![vec![Arc::clone(&self.predicate)]])) + .collect::>>()? } else { - Ok(FilterPushdownPlan::all_supported(parent_filters, 1) - .with_self_filters_for_children(vec![vec![Arc::clone(&self.predicate)]])) - } + parent_filters + }; + + Ok( + FilterDescription::all_supported_from_parent(parent_filters, 1) + .with_self_filters(vec![vec![self_filter]]), + ) } fn handle_child_pushdown_result( &self, - child_pushdown_result: ChildPushdownResult, + mut child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - // We absorb any parent filters that were not handled by our children - let mut unhandled_filters = - child_pushdown_result.parent_filters.collect_unsupported(); - // Was our own predicate handled by our children? assert_eq!( child_pushdown_result.self_filters.len(), 1, "FilterExec should only have one child" ); - let child_filter = child_pushdown_result + assert_eq!( + child_pushdown_result.self_filters[0].len(), + 1, + "FilterExec produces only one filter" + ); + + // We absorb any parent filters that were not handled by our children + let mut unhandled_filters = + child_pushdown_result.parent_filters.collect_unsupported(); + + let self_filters = child_pushdown_result .self_filters - .into_iter() - .next() - .expect("FilterExec should only have one child") - .iter() - .next() - .expect("FilterExec produces only one filter for each child") - .clone(); - if let FilterPushdown::Unsupported(expr) = child_filter { + .swap_remove(0) + .into_inner() + .swap_remove(0); + if let PredicateSupport::Unsupported(expr) = self_filters { unhandled_filters.push(expr); } + // If we have unhandled filters, we need to create a new FilterExec let filter_input = Arc::clone(self.input()); let new_predicate = conjunction(unhandled_filters); @@ -559,8 +572,8 @@ impl ExecutionPlan for FilterExec { ) }; Ok(FilterPushdownPropagation { - parent_filter_result: child_pushdown_result.parent_filters.as_supported(), - new_node: Some(new_exec), + filters: child_pushdown_result.parent_filters.make_supported(), + updated_node: Some(new_exec), }) } } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 63f3a756a256..01a72911ba2e 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -16,98 +16,94 @@ // under the License. use std::sync::Arc; +use std::vec::IntoIter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -/// The result of or a plan for pushing down a filter into a child node. +/// The result of a plan for pushing down a filter into a child node. /// This contains references to filters so that nodes can mutate a filter /// before pushing it down to a child node (e.g. to adjust a projection) /// or can directly take ownership of `Unsupported` filters that their children /// could not handle. #[derive(Debug, Clone)] -pub enum FilterPushdown { +pub enum PredicateSupport { Supported(Arc), Unsupported(Arc), } -/// A thin wrapper around [`FilterPushdown`]s that allows for easy collection of -/// supported and unsupported filters. +/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of +/// supported and unsupported filters. Inner vector stores each predicate for one node. #[derive(Debug, Clone)] -pub struct FilterPushdowns(Vec); +pub struct PredicateSupports(Vec); -impl FilterPushdowns { +impl PredicateSupports { /// Create a new FilterPushdowns with the given filters and their pushdown status. - pub fn new(pushdowns: Vec) -> Self { + pub fn new(pushdowns: Vec) -> Self { Self(pushdowns) } - /// Create a new FilterPushdowns with all filters as supported. - pub fn all_supported(filters: &[Arc]) -> Self { + /// Create a new [`PredicateSupports`] with all filters as supported. + pub fn all_supported(filters: Vec>) -> Self { let pushdowns = filters - .iter() - .map(|f| FilterPushdown::Supported(Arc::clone(f))) + .into_iter() + .map(PredicateSupport::Supported) .collect(); Self::new(pushdowns) } - /// Create a new FilterPushdowns with all filters as unsupported. - pub fn all_unsupported(filters: &[Arc]) -> Self { + /// Create a new [`PredicateSupports`] with all filters as unsupported. + pub fn all_unsupported(filters: Vec>) -> Self { let pushdowns = filters - .iter() - .map(|f| FilterPushdown::Unsupported(Arc::clone(f))) + .into_iter() + .map(PredicateSupport::Unsupported) .collect(); Self::new(pushdowns) } /// Transform all filters to supported, returning a new FilterPushdowns. - /// This does not modify the original FilterPushdowns. - pub fn as_supported(&self) -> Self { + /// This does not modify the original [`PredicateSupports`]. + pub fn make_supported(self) -> Self { let pushdowns = self .0 - .iter() + .into_iter() .map(|f| match f { - FilterPushdown::Supported(expr) => { - FilterPushdown::Supported(Arc::clone(expr)) - } - FilterPushdown::Unsupported(expr) => { - FilterPushdown::Supported(Arc::clone(expr)) - } + PredicateSupport::Supported(expr) => PredicateSupport::Supported(expr), + PredicateSupport::Unsupported(expr) => PredicateSupport::Supported(expr), }) .collect(); Self::new(pushdowns) } /// Collect unsupported filters into a Vec, without removing them from the original - /// FilterPushdowns. + /// [`PredicateSupports`]. pub fn collect_unsupported(&self) -> Vec> { self.0 .iter() .filter_map(|f| match f { - FilterPushdown::Unsupported(expr) => Some(Arc::clone(expr)), - FilterPushdown::Supported(_) => None, + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, }) .collect() } - /// Collect all filters as PhysicalExprs into a Vec, without removing them from the original + /// Collect all filters into a Vec, without removing them from the original /// FilterPushdowns. - pub fn into_inner_filters(&self) -> Vec> { + pub fn collect_all(self) -> Vec> { self.0 - .iter() + .into_iter() .map(|f| match f { - FilterPushdown::Supported(expr) => Arc::clone(expr), - FilterPushdown::Unsupported(expr) => Arc::clone(expr), + PredicateSupport::Supported(expr) + | PredicateSupport::Unsupported(expr) => expr, }) .collect() } - /// Return the inner `Vec` without modifying the original FilterPushdowns. - pub fn into_inner(&self) -> Vec { - self.0.clone() + pub fn into_inner(self) -> Vec { + self.0 } /// Return an iterator over the inner `Vec`. - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator { self.0.iter() } @@ -122,6 +118,15 @@ impl FilterPushdowns { } } +impl IntoIterator for PredicateSupports { + type Item = PredicateSupport; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + /// The result of pushing down filters into a child node. /// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. /// Nodes process this result and convert it into a [`FilterPushdownPropagation`] @@ -142,10 +147,10 @@ pub struct ChildPushdownResult { /// down into any child then the result is unsupported. /// If at least one children and all children that received the filter mark it as supported /// then the result is supported. - pub parent_filters: FilterPushdowns, + pub parent_filters: PredicateSupports, /// The result of pushing down each filter this node provided into each of it's children. /// This is not combined with the parent filters so that nodes can treat each child independently. - pub self_filters: Vec, + pub self_filters: Vec, } /// The result of pushing down filters into a node that it returns to its parent. @@ -158,8 +163,8 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { - pub parent_filter_result: FilterPushdowns, - pub new_node: Option, + pub filters: PredicateSupports, + pub updated_node: Option, } impl FilterPushdownPropagation { @@ -168,56 +173,59 @@ impl FilterPushdownPropagation { /// into the children. pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { Self { - parent_filter_result: child_pushdown_result.parent_filters, - new_node: None, + filters: child_pushdown_result.parent_filters, + updated_node: None, } } /// Create a new [`FilterPushdownPropagation`] that tells the parent node /// that none of the parent filters were not pushed down. - pub fn unsupported(parent_filters: &[Arc]) -> Self { - let unsupported = FilterPushdowns::all_unsupported(parent_filters); + pub fn unsupported(parent_filters: Vec>) -> Self { + let unsupported = PredicateSupports::all_unsupported(parent_filters); Self { - parent_filter_result: unsupported, - new_node: None, + filters: unsupported, + updated_node: None, } } } #[derive(Debug, Clone)] -pub struct FilterPushdownPlan { - pub parent_filters_for_children: Vec, - pub self_filters_for_children: Vec>>, +pub struct FilterDescription { + /// Vector storing the [`PredicateSupports`] for each child. + pub parent_filters: Vec, + /// Vector storing the physical expressions for each child. + /// Inner vector is for multiple predicates, if the node stores them such. + pub self_filters: Vec>>, } -impl FilterPushdownPlan { - pub fn all_unsupported( - parent_filters: &[Arc], +impl FilterDescription { + pub fn all_unsupported_from_parent( + parent_filters: Vec>, num_children: usize, ) -> Self { - let unsupported = FilterPushdowns::all_unsupported(parent_filters); + let unsupported = PredicateSupports::all_unsupported(parent_filters); Self { - parent_filters_for_children: vec![unsupported; num_children], - self_filters_for_children: vec![vec![]; num_children], + parent_filters: vec![unsupported; num_children], + self_filters: vec![vec![]; num_children], } } - pub fn all_supported( - parent_filters: &[Arc], + pub fn all_supported_from_parent( + parent_filters: Vec>, num_children: usize, ) -> Self { - let supported = FilterPushdowns::all_supported(parent_filters); + let supported = PredicateSupports::all_supported(parent_filters); Self { - parent_filters_for_children: vec![supported; num_children], - self_filters_for_children: vec![vec![]; num_children], + parent_filters: vec![supported; num_children], + self_filters: vec![vec![]; num_children], } } - pub fn with_self_filters_for_children( + pub fn with_self_filters( mut self, self_filters_for_children: Vec>>, ) -> Self { - self.self_filters_for_children = self_filters_for_children; + self.self_filters = self_filters_for_children; self } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 9b4c3f7d65c4..1f5f94919922 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -54,7 +54,7 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPlan, FilterPushdownPropagation, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -745,10 +745,13 @@ impl ExecutionPlan for RepartitionExec { fn gather_filters_for_pushdown( &self, - parent_filters: &[Arc], + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result { - Ok(FilterPushdownPlan::all_supported(parent_filters, 1)) + ) -> Result { + Ok(FilterDescription::all_supported_from_parent( + parent_filters, + 1, + )) } fn handle_child_pushdown_result( From 907f7c87a0807ae3039f776246d944699e8aade2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 20:07:35 -0500 Subject: [PATCH 10/19] add more comments --- .../physical-plan/src/coalesce_batches.rs | 6 +- .../physical-plan/src/execution_plan.rs | 15 +++-- datafusion/physical-plan/src/filter.rs | 7 +-- .../physical-plan/src/filter_pushdown.rs | 59 ++++++++++++++----- .../physical-plan/src/repartition/mod.rs | 6 +- 5 files changed, 63 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index c4bc0dae15a1..a0dd7371b4a0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -232,10 +232,8 @@ impl ExecutionPlan for CoalesceBatchesExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::all_supported_from_parent( - parent_filters, - 1, - )) + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b53162120322..b81b3c8beeac 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -514,10 +514,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::all_unsupported_from_parent( - parent_filters, - self.children().len(), - )) + Ok( + FilterDescription::new_with_child_count(self.children().len()) + .all_parent_filters_unsupported(parent_filters), + ) } /// Handle the result of a child pushdown. @@ -527,9 +527,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// (also known as late materialization). /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it /// may remove itself from the plan altogether. + /// It combines both [`ChildPushdownResult::parent_filters`] and [`ChildPushdownResult::self_filters`] into a single + /// predicate and replaces it's own predicate. + /// Then it passes [`PredicateSupport::Supported`] for each parent predicate to the parent. /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. + /// It passes [`ChildPushdownResult::parent_filters`] back up to it's parents wrapped in [`FilterPushdownPropagation::transparent`] + /// and [`ChildPushdownResult::self_filters`] is discarded. /// /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. + /// + /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported fn handle_child_pushdown_result( &self, child_pushdown_result: ChildPushdownResult, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 277b3bd731e6..9f5d9dc2984e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -501,10 +501,9 @@ impl ExecutionPlan for FilterExec { parent_filters }; - Ok( - FilterDescription::all_supported_from_parent(parent_filters, 1) - .with_self_filters(vec![vec![self_filter]]), - ) + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(self_filter)) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 01a72911ba2e..f5c9f9f2da52 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -191,6 +191,7 @@ impl FilterPushdownPropagation { #[derive(Debug, Clone)] pub struct FilterDescription { + num_children: usize, /// Vector storing the [`PredicateSupports`] for each child. pub parent_filters: Vec, /// Vector storing the physical expressions for each child. @@ -199,33 +200,63 @@ pub struct FilterDescription { } impl FilterDescription { - pub fn all_unsupported_from_parent( - parent_filters: Vec>, - num_children: usize, - ) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); + pub fn new_with_child_count(num_children: usize) -> Self { Self { - parent_filters: vec![unsupported; num_children], + num_children, + parent_filters: Vec::with_capacity(num_children), self_filters: vec![vec![]; num_children], } } - pub fn all_supported_from_parent( + /// Mark all parent filters as supported for all children. + /// This is the case if the node allows filters to be pushed down through it + /// without any modification. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently + /// use this to mark all parent filters as supported. + pub fn all_parent_filters_supported( + self, parent_filters: Vec>, - num_children: usize, ) -> Self { let supported = PredicateSupports::all_supported(parent_filters); Self { - parent_filters: vec![supported; num_children], - self_filters: vec![vec![]; num_children], + num_children: self.num_children, + parent_filters: vec![supported; self.num_children], + self_filters: self.self_filters, } } - pub fn with_self_filters( - mut self, - self_filters_for_children: Vec>>, + /// Mark all parent filters as unsupported for all children. + /// This is the case if the node does not allow filters to be pushed down through it. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] + /// assumes that filters cannot be pushed down to children. + pub fn all_parent_filters_unsupported( + self, + parent_filters: Vec>, ) -> Self { - self.self_filters = self_filters_for_children; + let unsupported = PredicateSupports::all_unsupported(parent_filters); + Self { + num_children: self.num_children, + parent_filters: vec![unsupported; self.num_children], + self_filters: self.self_filters, + } + } + + /// Add a filter generated / owned by the current node to be pushed down to all children. + /// This assumes that there is a single filter that that gets pushed down to all children + /// equally. + /// If there are multiple filters or pushdown to children is not homogeneous then + /// you should set the field directly. + /// For example: + /// - `TopK` uses this to push down a single filter to all children, it can use this method. + /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. + pub fn with_self_filter(mut self, predicate: Arc) -> Self { + self.self_filters = vec![vec![predicate]; self.num_children]; self } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 1f5f94919922..c86a37697a05 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -748,10 +748,8 @@ impl ExecutionPlan for RepartitionExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::all_supported_from_parent( - parent_filters, - 1, - )) + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) } fn handle_child_pushdown_result( From d0c1014001b00c3c8b7af6077e277f5517e84a62 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 3 May 2025 21:06:39 -0500 Subject: [PATCH 11/19] fix doc links --- datafusion/physical-plan/src/filter_pushdown.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index f5c9f9f2da52..8b9ab6cce5f2 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -216,6 +216,8 @@ impl FilterDescription { /// field direclty. /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently /// use this to mark all parent filters as supported. + /// + /// [`RepartitionExec`]: crate::repartition::RepartitionExec pub fn all_parent_filters_supported( self, parent_filters: Vec>, @@ -235,6 +237,8 @@ impl FilterDescription { /// field direclty. /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] /// assumes that filters cannot be pushed down to children. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan pub fn all_parent_filters_unsupported( self, parent_filters: Vec>, From 3b4c4fa1e89e2bbda81639c3414791cffd99d346 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 4 May 2025 12:20:10 +0200 Subject: [PATCH 12/19] fmt --- datafusion/physical-plan/src/filter_pushdown.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 8b9ab6cce5f2..88ee977ade23 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -216,7 +216,7 @@ impl FilterDescription { /// field direclty. /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently /// use this to mark all parent filters as supported. - /// + /// /// [`RepartitionExec`]: crate::repartition::RepartitionExec pub fn all_parent_filters_supported( self, @@ -237,7 +237,7 @@ impl FilterDescription { /// field direclty. /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] /// assumes that filters cannot be pushed down to children. - /// + /// /// [`ExecutionPlan`]: crate::ExecutionPlan pub fn all_parent_filters_unsupported( self, From a4466622d1beee5a25d80d4fd7fba9d3cbfdfbfa Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 11:43:39 +0200 Subject: [PATCH 13/19] add comments --- .../physical-optimizer/src/filter_pushdown.rs | 4 +- .../physical-plan/src/filter_pushdown.rs | 74 ++++++++++++++----- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index ca4c5920a224..6c445458b51b 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -422,8 +422,8 @@ fn push_down_filters( for (child, parent_filters, self_filters) in izip!( children, - filter_description.parent_filters, - filter_description.self_filters + filter_description.parent_filters(), + filter_description.self_filters() ) { // Here, `parent_filters` are the predicates which are provided by the parent node of // the current node, and tried to be pushed down over the child which the loop points diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 88ee977ade23..0003fc9d7277 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -189,25 +189,59 @@ impl FilterPushdownPropagation { } } +#[derive(Debug, Clone)] +struct ChildFilterDescription { + /// Description of which parent filters can be pushed down into this node. + /// Since we need to transmit filter pushdown results back to this node's parent + /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. + /// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters. + parent_filters: PredicateSupports, + /// Description of which filters this node is pushing down to its children. + /// Since this is not transmitted back to the parents we can have variable sized inner arrays + /// instead of having to track supported/unsupported. + self_filters: Vec>, +} + +impl ChildFilterDescription { + fn new() -> Self { + Self { + parent_filters: PredicateSupports::new(vec![]), + self_filters: vec![], + } + } +} + #[derive(Debug, Clone)] pub struct FilterDescription { - num_children: usize, - /// Vector storing the [`PredicateSupports`] for each child. - pub parent_filters: Vec, - /// Vector storing the physical expressions for each child. - /// Inner vector is for multiple predicates, if the node stores them such. - pub self_filters: Vec>>, + /// A filter description for each child. + /// This includes which parent filters and which self filters (from the node in question) + /// will get pushed down to each child. + child_filter_descriptions: Vec, } impl FilterDescription { pub fn new_with_child_count(num_children: usize) -> Self { Self { - num_children, - parent_filters: Vec::with_capacity(num_children), - self_filters: vec![vec![]; num_children], + child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], } } + pub fn parent_filters(&self) -> Vec { + self.child_filter_descriptions + .iter() + .map(|d| &d.parent_filters) + .cloned() + .collect() + } + + pub fn self_filters(&self) -> Vec>> { + self.child_filter_descriptions + .iter() + .map(|d| &d.self_filters) + .cloned() + .collect() + } + /// Mark all parent filters as supported for all children. /// This is the case if the node allows filters to be pushed down through it /// without any modification. @@ -219,15 +253,14 @@ impl FilterDescription { /// /// [`RepartitionExec`]: crate::repartition::RepartitionExec pub fn all_parent_filters_supported( - self, + mut self, parent_filters: Vec>, ) -> Self { let supported = PredicateSupports::all_supported(parent_filters); - Self { - num_children: self.num_children, - parent_filters: vec![supported; self.num_children], - self_filters: self.self_filters, + for child in &mut self.child_filter_descriptions { + child.parent_filters = supported.clone(); } + self } /// Mark all parent filters as unsupported for all children. @@ -240,15 +273,14 @@ impl FilterDescription { /// /// [`ExecutionPlan`]: crate::ExecutionPlan pub fn all_parent_filters_unsupported( - self, + mut self, parent_filters: Vec>, ) -> Self { let unsupported = PredicateSupports::all_unsupported(parent_filters); - Self { - num_children: self.num_children, - parent_filters: vec![unsupported; self.num_children], - self_filters: self.self_filters, + for child in &mut self.child_filter_descriptions { + child.parent_filters = unsupported.clone(); } + self } /// Add a filter generated / owned by the current node to be pushed down to all children. @@ -260,7 +292,9 @@ impl FilterDescription { /// - `TopK` uses this to push down a single filter to all children, it can use this method. /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. pub fn with_self_filter(mut self, predicate: Arc) -> Self { - self.self_filters = vec![vec![predicate]; self.num_children]; + for child in &mut self.child_filter_descriptions { + child.self_filters = vec![Arc::clone(&predicate)]; + } self } } From 1103c1ad85b84884e373c06aefb6b7cf78b49aef Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 12:09:27 +0200 Subject: [PATCH 14/19] make test deterministic --- .../sqllogictest/test_files/parquet_filter_pushdown.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index a32db2ff0524..252704f260b8 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -187,13 +187,13 @@ COPY ( STORED AS PARQUET; query TT -select * from t_pushdown where part == val +select * from t_pushdown where part == val order by part, val; ---- a a b b query TT -select * from t_pushdown where part != val +select * from t_pushdown where part != val order by part, val; ---- xyz c From 40a71c6fc2a9e148ac8b35159fed81905c36164a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 12:18:30 +0200 Subject: [PATCH 15/19] add bench --- datafusion/core/benches/push_down_filter.rs | 125 ++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 datafusion/core/benches/push_down_filter.rs diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs new file mode 100644 index 000000000000..b6e7a2299794 --- /dev/null +++ b/datafusion/core/benches/push_down_filter.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema}; +use bytes::{BufMut, BytesMut}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion::config::ConfigOptions; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::ArrowWriter; +use std::sync::Arc; + +async fn create_plan() -> Arc { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + Field::new("age", DataType::UInt16, true), + Field::new("salary", DataType::Float64, true), + ])); + let batch = RecordBatch::new_empty(schema); + + let store = Arc::new(InMemory::new()) as Arc; + let data = { + let out = BytesMut::new(); + let mut writer = + ArrowWriter::try_new(out.writer(), batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + writer.into_inner().unwrap().into_inner().freeze() + }; + store + .put(&Path::from("test.parquet"), data.into()) + .await + .unwrap(); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + store, + ); + + ctx.register_parquet("t", "memory://", ParquetReadOptions::default()) + .await + .unwrap(); + + let df = ctx + .sql( + r" + WITH brackets AS ( + SELECT age % 10 AS age_bracket + FROM t + GROUP BY age % 10 + HAVING COUNT(*) > 10 + ) + SELECT id, name, age, salary + FROM t + JOIN brackets ON t.age % 10 = brackets.age_bracket + WHERE age > 20 AND data.salary > 1000 + ORDER BY data.salary DESC + LIMIT 100 + ", + ) + .await + .unwrap(); + + df.create_physical_plan().await.unwrap() +} + +#[derive(Clone)] +struct BenchmarkPlan { + plan: Arc, + config: ConfigOptions, +} + +impl std::fmt::Display for BenchmarkPlan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BenchmarkPlan") + } +} + +fn bench_push_down_filter(c: &mut Criterion) { + // Create a relatively complex plan + let plan = tokio::runtime::Runtime::new() + .unwrap() + .block_on(create_plan()); + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = BenchmarkPlan { plan, config }; + + c.bench_with_input( + BenchmarkId::new("push_down_filter", plan.clone()), + &plan, + |b, plan| { + b.iter(|| { + let optimizer = PushdownFilter::new(); + optimizer + .optimize(Arc::clone(&plan.plan), &plan.config) + .unwrap(); + }); + }, + ); +} + +criterion_group!(benches, bench_push_down_filter); +criterion_main!(benches); From 2e5d6351ba844942e90f1289e6128477702492bd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 12:29:18 +0200 Subject: [PATCH 16/19] fix bench --- datafusion/core/benches/push_down_filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs index b6e7a2299794..7ed777e3dac4 100644 --- a/datafusion/core/benches/push_down_filter.rs +++ b/datafusion/core/benches/push_down_filter.rs @@ -22,7 +22,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use datafusion::config::ConfigOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::ExecutionPlan; use object_store::memory::InMemory; @@ -112,7 +112,7 @@ fn bench_push_down_filter(c: &mut Criterion) { &plan, |b, plan| { b.iter(|| { - let optimizer = PushdownFilter::new(); + let optimizer = FilterPushdown::new(); optimizer .optimize(Arc::clone(&plan.plan), &plan.config) .unwrap(); From a198143c1f93bb7944d1597268857f4cc8dddb78 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 12:54:37 +0200 Subject: [PATCH 17/19] register bench --- datafusion/core/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3ace3e14ec25..4b6d8f274932 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -179,6 +179,10 @@ name = "csv_load" harness = false name = "distinct_query_sql" +[[bench]] +harness = false +name = "push_down_filter" + [[bench]] harness = false name = "sort_limit_query_sql" From cb6399cb70e6fab3e6ddd6395349593d9b942f51 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 5 May 2025 14:20:43 +0200 Subject: [PATCH 18/19] fix bench --- datafusion/core/benches/push_down_filter.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs index 7ed777e3dac4..4ed2220637b1 100644 --- a/datafusion/core/benches/push_down_filter.rs +++ b/datafusion/core/benches/push_down_filter.rs @@ -42,14 +42,14 @@ async fn create_plan() -> Arc { let batch = RecordBatch::new_empty(schema); let store = Arc::new(InMemory::new()) as Arc; - let data = { - let out = BytesMut::new(); + let mut out = BytesMut::new().writer(); + { let mut writer = - ArrowWriter::try_new(out.writer(), batch.schema(), None).unwrap(); + ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); - writer.into_inner().unwrap().into_inner().freeze() - }; + } + let data = out.into_inner().freeze(); store .put(&Path::from("test.parquet"), data.into()) .await @@ -59,7 +59,7 @@ async fn create_plan() -> Arc { store, ); - ctx.register_parquet("t", "memory://", ParquetReadOptions::default()) + ctx.register_parquet("t", "memory:///", ParquetReadOptions::default()) .await .unwrap(); @@ -75,8 +75,8 @@ async fn create_plan() -> Arc { SELECT id, name, age, salary FROM t JOIN brackets ON t.age % 10 = brackets.age_bracket - WHERE age > 20 AND data.salary > 1000 - ORDER BY data.salary DESC + WHERE age > 20 AND t.salary > 1000 + ORDER BY t.salary DESC LIMIT 100 ", ) From 98e91f7e4c57d17bb515dd81c41573a2ad34a287 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Mon, 5 May 2025 16:08:02 +0300 Subject: [PATCH 19/19] cargo fmt --- datafusion/core/benches/push_down_filter.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs index 4ed2220637b1..92de1711a9e8 100644 --- a/datafusion/core/benches/push_down_filter.rs +++ b/datafusion/core/benches/push_down_filter.rs @@ -44,8 +44,7 @@ async fn create_plan() -> Arc { let store = Arc::new(InMemory::new()) as Arc; let mut out = BytesMut::new().writer(); { - let mut writer = - ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); }