diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index dd5736806eeb..55971f6f627c 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -45,6 +45,9 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, ProjectionExec, +}; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; @@ -479,6 +482,36 @@ impl ExecutionPlan for CsvExec { cache: self.cache.clone(), })) } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If there is any non-column or alias-carrier expression, Projection should not be removed. + // This process can be moved into CsvExec, but it would be an overlap of their responsibility. + Ok(all_alias_free_columns(projection.expr()).then(|| { + let mut file_scan = self.base_config().clone(); + let new_projections = new_projections_for_columns( + projection, + &file_scan + .projection + .unwrap_or((0..self.schema().fields().len()).collect()), + ); + file_scan.projection = Some(new_projections); + + Arc::new( + CsvExec::builder(file_scan) + .with_has_header(self.has_header()) + .with_delimeter(self.delimiter()) + .with_quote(self.quote()) + .with_escape(self.escape()) + .with_comment(self.comment()) + .with_newlines_in_values(self.newlines_in_values()) + .with_file_compression_type(self.file_compression_type) + .build(), + ) as _ + })) + } } /// A Config for [`CsvOpener`] diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index cf8d5e352ef1..46fddbd3c936 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -20,46 +20,18 @@ //! projections one by one if the operator below is amenable to this. If a //! projection reaches a source, it can even disappear from the plan entirely. -use std::collections::HashMap; use std::sync::Arc; -use super::output_requirements::OutputRequirementExec; -use crate::datasource::physical_plan::CsvExec; use crate::error::Result; -use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; -use crate::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, - SymmetricHashJoinExec, -}; -use crate::physical_plan::memory::MemoryExec; -use crate::physical_plan::projection::ProjectionExec; -use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use crate::physical_plan::{Distribution, ExecutionPlan, ExecutionPlanProperties}; - -use arrow_schema::SchemaRef; +use crate::physical_plan::ExecutionPlan; + use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; -use datafusion_common::{internal_err, JoinSide}; -use datafusion_physical_expr::expressions::{Column, Literal}; -use datafusion_physical_expr::{ - utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef, - PhysicalSortExpr, PhysicalSortRequirement, -}; -use datafusion_physical_plan::joins::utils::{JoinOn, JoinOnRef}; -use datafusion_physical_plan::streaming::StreamingTableExec; -use datafusion_physical_plan::union::UnionExec; - -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_common::tree_node::{TransformedResult, TreeNode}; + use datafusion_physical_optimizer::PhysicalOptimizerRule; -use itertools::Itertools; +use datafusion_physical_plan::projection::remove_unnecessary_projections; -/// This rule inspects [`ProjectionExec`]'s in the given physical plan and tries to +/// This rule inspects `ProjectionExec`'s in the given physical plan and tries to /// remove or swap with its child. #[derive(Default, Debug)] pub struct ProjectionPushdown {} @@ -89,1290 +61,51 @@ impl PhysicalOptimizerRule for ProjectionPushdown { } } -/// This function checks if `plan` is a [`ProjectionExec`], and inspects its -/// input(s) to test whether it can push `plan` under its input(s). This function -/// will operate on the entire tree and may ultimately remove `plan` entirely -/// by leveraging source providers with built-in projection capabilities. -pub fn remove_unnecessary_projections( - plan: Arc, -) -> Result>> { - let maybe_modified = if let Some(projection) = - plan.as_any().downcast_ref::() - { - // If the projection does not cause any change on the input, we can - // safely remove it: - if is_projection_removable(projection) { - return Ok(Transformed::yes(Arc::clone(projection.input()))); - } - // If it does, check if we can push it under its child(ren): - let input = projection.input().as_any(); - if let Some(csv) = input.downcast_ref::() { - try_swapping_with_csv(projection, csv) - } else if let Some(memory) = input.downcast_ref::() { - try_swapping_with_memory(projection, memory)? - } else if let Some(child_projection) = input.downcast_ref::() { - let maybe_unified = try_unifying_projections(projection, child_projection)?; - return if let Some(new_plan) = maybe_unified { - // To unify 3 or more sequential projections: - remove_unnecessary_projections(new_plan) - .data() - .map(Transformed::yes) - } else { - Ok(Transformed::no(plan)) - }; - } else if let Some(output_req) = input.downcast_ref::() { - try_swapping_with_output_req(projection, output_req)? - } else if input.is::() { - try_swapping_with_coalesce_partitions(projection)? - } else if let Some(filter) = input.downcast_ref::() { - try_swapping_with_filter(projection, filter)?.map_or_else( - || try_embed_projection(projection, filter), - |e| Ok(Some(e)), - )? - } else if let Some(repartition) = input.downcast_ref::() { - try_swapping_with_repartition(projection, repartition)? - } else if let Some(sort) = input.downcast_ref::() { - try_swapping_with_sort(projection, sort)? - } else if let Some(spm) = input.downcast_ref::() { - try_swapping_with_sort_preserving_merge(projection, spm)? - } else if let Some(union) = input.downcast_ref::() { - try_pushdown_through_union(projection, union)? - } else if let Some(hash_join) = input.downcast_ref::() { - try_pushdown_through_hash_join(projection, hash_join)? - } else if let Some(cross_join) = input.downcast_ref::() { - try_swapping_with_cross_join(projection, cross_join)? - } else if let Some(nl_join) = input.downcast_ref::() { - try_pushdown_through_nested_loop_join(projection, nl_join)? - } else if let Some(sm_join) = input.downcast_ref::() { - try_swapping_with_sort_merge_join(projection, sm_join)? - } else if let Some(sym_join) = input.downcast_ref::() { - try_swapping_with_sym_hash_join(projection, sym_join)? - } else if let Some(ste) = input.downcast_ref::() { - try_swapping_with_streaming_table(projection, ste)? - } else { - // If the input plan of the projection is not one of the above, we - // conservatively assume that pushing the projection down may hurt. - // When adding new operators, consider adding them here if you - // think pushing projections under them is beneficial. - None - } - } else { - return Ok(Transformed::no(plan)); - }; - - Ok(maybe_modified.map_or(Transformed::no(plan), Transformed::yes)) -} - -/// Tries to embed `projection` to its input (`csv`). If possible, returns -/// [`CsvExec`] as the top plan. Otherwise, returns `None`. -fn try_swapping_with_csv( - projection: &ProjectionExec, - csv: &CsvExec, -) -> Option> { - // If there is any non-column or alias-carrier expression, Projection should not be removed. - // This process can be moved into CsvExec, but it would be an overlap of their responsibility. - all_alias_free_columns(projection.expr()).then(|| { - let mut file_scan = csv.base_config().clone(); - let new_projections = new_projections_for_columns( - projection, - &file_scan - .projection - .unwrap_or((0..csv.schema().fields().len()).collect()), - ); - file_scan.projection = Some(new_projections); - - Arc::new( - CsvExec::builder(file_scan) - .with_has_header(csv.has_header()) - .with_delimeter(csv.delimiter()) - .with_quote(csv.quote()) - .with_escape(csv.escape()) - .with_comment(csv.comment()) - .with_newlines_in_values(csv.newlines_in_values()) - .with_file_compression_type(csv.file_compression_type) - .build(), - ) as _ - }) -} - -/// Tries to embed `projection` to its input (`memory`). If possible, returns -/// [`MemoryExec`] as the top plan. Otherwise, returns `None`. -fn try_swapping_with_memory( - projection: &ProjectionExec, - memory: &MemoryExec, -) -> Result>> { - // If there is any non-column or alias-carrier expression, Projection should not be removed. - // This process can be moved into MemoryExec, but it would be an overlap of their responsibility. - all_alias_free_columns(projection.expr()) - .then(|| { - let all_projections = (0..memory.schema().fields().len()).collect(); - let new_projections = new_projections_for_columns( - projection, - memory.projection().as_ref().unwrap_or(&all_projections), - ); - - MemoryExec::try_new( - memory.partitions(), - memory.original_schema(), - Some(new_projections), - ) - .map(|e| Arc::new(e) as _) - }) - .transpose() -} - -/// Tries to embed `projection` to its input (`streaming table`). -/// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise, -/// returns `None`. -fn try_swapping_with_streaming_table( - projection: &ProjectionExec, - streaming_table: &StreamingTableExec, -) -> Result>> { - if !all_alias_free_columns(projection.expr()) { - return Ok(None); - } - - let streaming_table_projections = streaming_table - .projection() - .as_ref() - .map(|i| i.as_ref().to_vec()); - let new_projections = new_projections_for_columns( - projection, - &streaming_table_projections - .unwrap_or((0..streaming_table.schema().fields().len()).collect()), - ); - - let mut lex_orderings = vec![]; - for lex_ordering in streaming_table.projected_output_ordering().into_iter() { - let mut orderings = LexOrdering::default(); - for order in lex_ordering { - let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)? - else { - return Ok(None); - }; - orderings.push(PhysicalSortExpr { - expr: new_ordering, - options: order.options, - }); - } - lex_orderings.push(orderings); - } - - StreamingTableExec::try_new( - Arc::clone(streaming_table.partition_schema()), - streaming_table.partitions().clone(), - Some(new_projections.as_ref()), - lex_orderings, - streaming_table.is_infinite(), - streaming_table.limit(), - ) - .map(|e| Some(Arc::new(e) as _)) -} - -/// Unifies `projection` with its input (which is also a [`ProjectionExec`]). -fn try_unifying_projections( - projection: &ProjectionExec, - child: &ProjectionExec, -) -> Result>> { - let mut projected_exprs = vec![]; - let mut column_ref_map: HashMap = HashMap::new(); - - // Collect the column references usage in the outer projection. - projection.expr().iter().for_each(|(expr, _)| { - expr.apply(|expr| { - Ok({ - if let Some(column) = expr.as_any().downcast_ref::() { - *column_ref_map.entry(column.clone()).or_default() += 1; - } - TreeNodeRecursion::Continue - }) - }) - .unwrap(); - }); - - // Merging these projections is not beneficial, e.g - // If an expression is not trivial and it is referred more than 1, unifies projections will be - // beneficial as caching mechanism for non-trivial computations. - // See discussion in: https://github.com/apache/datafusion/issues/8296 - if column_ref_map.iter().any(|(column, count)| { - *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].0)) - }) { - return Ok(None); - } - - for (expr, alias) in projection.expr() { - // If there is no match in the input projection, we cannot unify these - // projections. This case will arise if the projection expression contains - // a `PhysicalExpr` variant `update_expr` doesn't support. - let Some(expr) = update_expr(expr, child.expr(), true)? else { - return Ok(None); - }; - projected_exprs.push((expr, alias.clone())); - } - - ProjectionExec::try_new(projected_exprs, Arc::clone(child.input())) - .map(|e| Some(Arc::new(e) as _)) -} - -/// Checks if the given expression is trivial. -/// An expression is considered trivial if it is either a `Column` or a `Literal`. -fn is_expr_trivial(expr: &Arc) -> bool { - expr.as_any().downcast_ref::().is_some() - || expr.as_any().downcast_ref::().is_some() -} - -/// Tries to swap `projection` with its input (`output_req`). If possible, -/// performs the swap and returns [`OutputRequirementExec`] as the top plan. -/// Otherwise, returns `None`. -fn try_swapping_with_output_req( - projection: &ProjectionExec, - output_req: &OutputRequirementExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down: - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - let mut updated_sort_reqs = LexRequirement::new(vec![]); - // None or empty_vec can be treated in the same way. - if let Some(reqs) = &output_req.required_input_ordering()[0] { - for req in &reqs.inner { - let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)? else { - return Ok(None); - }; - updated_sort_reqs.push(PhysicalSortRequirement { - expr: new_expr, - options: req.options, - }); - } - } - - let dist_req = match &output_req.required_input_distribution()[0] { - Distribution::HashPartitioned(exprs) => { - let mut updated_exprs = vec![]; - for expr in exprs { - let Some(new_expr) = update_expr(expr, projection.expr(), false)? else { - return Ok(None); - }; - updated_exprs.push(new_expr); - } - Distribution::HashPartitioned(updated_exprs) - } - dist => dist.clone(), - }; - - make_with_child(projection, &output_req.input()) - .map(|input| { - OutputRequirementExec::new( - input, - (!updated_sort_reqs.is_empty()).then_some(updated_sort_reqs), - dist_req, - ) - }) - .map(|e| Some(Arc::new(e) as _)) -} - -/// Tries to swap `projection` with its input, which is known to be a -/// [`CoalescePartitionsExec`]. If possible, performs the swap and returns -/// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`. -fn try_swapping_with_coalesce_partitions( - projection: &ProjectionExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down: - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - // CoalescePartitionsExec always has a single child, so zero indexing is safe. - make_with_child(projection, projection.input().children()[0]) - .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _)) -} - -/// Tries to swap `projection` with its input (`filter`). If possible, performs -/// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`. -fn try_swapping_with_filter( - projection: &ProjectionExec, - filter: &FilterExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down: - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - // Each column in the predicate expression must exist after the projection. - let Some(new_predicate) = update_expr(filter.predicate(), projection.expr(), false)? - else { - return Ok(None); - }; - - FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?) - .and_then(|e| { - let selectivity = filter.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .map(|e| Some(Arc::new(e) as _)) -} - -/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done, -/// it returns the new swapped version having the [`RepartitionExec`] as the top plan. -/// Otherwise, it returns None. -fn try_swapping_with_repartition( - projection: &ProjectionExec, - repartition: &RepartitionExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down. - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - // If pushdown is not beneficial or applicable, break it. - if projection.benefits_from_input_partitioning()[0] || !all_columns(projection.expr()) - { - return Ok(None); - } - - let new_projection = make_with_child(projection, repartition.input())?; - - let new_partitioning = match repartition.partitioning() { - Partitioning::Hash(partitions, size) => { - let mut new_partitions = vec![]; - for partition in partitions { - let Some(new_partition) = - update_expr(partition, projection.expr(), false)? - else { - return Ok(None); - }; - new_partitions.push(new_partition); - } - Partitioning::Hash(new_partitions, *size) - } - others => others.clone(), - }; - - Ok(Some(Arc::new(RepartitionExec::try_new( - new_projection, - new_partitioning, - )?))) -} - -/// Tries to swap the projection with its input [`SortExec`]. If it can be done, -/// it returns the new swapped version having the [`SortExec`] as the top plan. -/// Otherwise, it returns None. -fn try_swapping_with_sort( - projection: &ProjectionExec, - sort: &SortExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down. - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - let mut updated_exprs = LexOrdering::default(); - for sort in sort.expr() { - let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? else { - return Ok(None); - }; - updated_exprs.push(PhysicalSortExpr { - expr: new_expr, - options: sort.options, - }); - } - - Ok(Some(Arc::new( - SortExec::new(updated_exprs, make_with_child(projection, sort.input())?) - .with_fetch(sort.fetch()) - .with_preserve_partitioning(sort.preserve_partitioning()), - ))) -} - -/// Tries to swap the projection with its input [`SortPreservingMergeExec`]. -/// If this is possible, it returns the new [`SortPreservingMergeExec`] whose -/// child is a projection. Otherwise, it returns None. -fn try_swapping_with_sort_preserving_merge( - projection: &ProjectionExec, - spm: &SortPreservingMergeExec, -) -> Result>> { - // If the projection does not narrow the schema, we should not try to push it down. - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - let mut updated_exprs = LexOrdering::default(); - for sort in spm.expr() { - let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)? - else { - return Ok(None); - }; - updated_exprs.push(PhysicalSortExpr { - expr: updated_expr, - options: sort.options, - }); - } - - Ok(Some(Arc::new( - SortPreservingMergeExec::new( - updated_exprs, - make_with_child(projection, spm.input())?, - ) - .with_fetch(spm.fetch()), - ))) -} - -/// Tries to push `projection` down through `union`. If possible, performs the -/// pushdown and returns a new [`UnionExec`] as the top plan which has projections -/// as its children. Otherwise, returns `None`. -fn try_pushdown_through_union( - projection: &ProjectionExec, - union: &UnionExec, -) -> Result>> { - // If the projection doesn't narrow the schema, we shouldn't try to push it down. - if projection.expr().len() >= projection.input().schema().fields().len() { - return Ok(None); - } - - let new_children = union - .children() - .into_iter() - .map(|child| make_with_child(projection, child)) - .collect::>>()?; - - Ok(Some(Arc::new(UnionExec::new(new_children)))) -} - -trait EmbeddedProjection: ExecutionPlan + Sized { - fn with_projection(&self, projection: Option>) -> Result; -} - -impl EmbeddedProjection for HashJoinExec { - fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) - } -} - -impl EmbeddedProjection for NestedLoopJoinExec { - fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) - } -} - -impl EmbeddedProjection for FilterExec { - fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) - } -} - -/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later. -/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation. -fn try_embed_projection( - projection: &ProjectionExec, - execution_plan: &Exec, -) -> Result>> { - // Collect all column indices from the given projection expressions. - let projection_index = collect_column_indices(projection.expr()); - - if projection_index.is_empty() { - return Ok(None); - }; - - // If the projection indices is the same as the input columns, we don't need to embed the projection to hash join. - // Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields. - if projection_index.len() == projection_index.last().unwrap() + 1 - && projection_index.len() == execution_plan.schema().fields().len() - { - return Ok(None); - } - - let new_execution_plan = - Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?); - - // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields. - let embed_project_exprs = projection_index - .iter() - .zip(new_execution_plan.schema().fields()) - .map(|(index, field)| { - ( - Arc::new(Column::new(field.name(), *index)) as Arc, - field.name().to_owned(), - ) - }) - .collect::>(); - - let mut new_projection_exprs = Vec::with_capacity(projection.expr().len()); - - for (expr, alias) in projection.expr() { - // update column index for projection expression since the input schema has been changed. - let Some(expr) = update_expr(expr, embed_project_exprs.as_slice(), false)? else { - return Ok(None); - }; - new_projection_exprs.push((expr, alias.clone())); - } - // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection. - let new_projection = Arc::new(ProjectionExec::try_new( - new_projection_exprs, - Arc::clone(&new_execution_plan) as _, - )?); - if is_projection_removable(&new_projection) { - Ok(Some(new_execution_plan)) - } else { - Ok(Some(new_projection)) - } -} - -/// Collect all column indices from the given projection expressions. -fn collect_column_indices(exprs: &[(Arc, String)]) -> Vec { - // Collect indices and remove duplicates. - let mut indices = exprs - .iter() - .flat_map(|(expr, _)| collect_columns(expr)) - .map(|x| x.index()) - .collect::>() - .into_iter() - .collect::>(); - indices.sort(); - indices -} - -struct JoinData { - projected_left_child: ProjectionExec, - projected_right_child: ProjectionExec, - join_filter: Option, - join_on: JoinOn, -} - -fn try_pushdown_through_join( - projection: &ProjectionExec, - join_left: &Arc, - join_right: &Arc, - join_on: JoinOnRef, - schema: SchemaRef, - filter: Option<&JoinFilter>, -) -> Result> { - // Convert projected expressions to columns. We can not proceed if this is not possible. - let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { - return Ok(None); - }; - - let (far_right_left_col_ind, far_left_right_col_ind) = - join_table_borders(join_left.schema().fields().len(), &projection_as_columns); - - if !join_allows_pushdown( - &projection_as_columns, - &schema, - far_right_left_col_ind, - far_left_right_col_ind, - ) { - return Ok(None); - } - - let new_filter = if let Some(filter) = filter { - match update_join_filter( - &projection_as_columns[0..=far_right_left_col_ind as _], - &projection_as_columns[far_left_right_col_ind as _..], - filter, - join_left.schema().fields().len(), - ) { - Some(updated_filter) => Some(updated_filter), - None => return Ok(None), - } - } else { - None - }; - - let Some(new_on) = update_join_on( - &projection_as_columns[0..=far_right_left_col_ind as _], - &projection_as_columns[far_left_right_col_ind as _..], - join_on, - join_left.schema().fields().len(), - ) else { - return Ok(None); - }; - - let (new_left, new_right) = new_join_children( - &projection_as_columns, - far_right_left_col_ind, - far_left_right_col_ind, - join_left, - join_right, - )?; - - Ok(Some(JoinData { - projected_left_child: new_left, - projected_right_child: new_right, - join_filter: new_filter, - join_on: new_on, - })) -} - -/// Tries to push `projection` down through `nested_loop_join`. If possible, performs the -/// pushdown and returns a new [`NestedLoopJoinExec`] as the top plan which has projections -/// as its children. Otherwise, returns `None`. -fn try_pushdown_through_nested_loop_join( - projection: &ProjectionExec, - nl_join: &NestedLoopJoinExec, -) -> Result>> { - // TODO: currently if there is projection in NestedLoopJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. - if nl_join.contains_projection() { - return Ok(None); - } - - if let Some(JoinData { - projected_left_child, - projected_right_child, - join_filter, - .. - }) = try_pushdown_through_join( - projection, - nl_join.left(), - nl_join.right(), - &[], - nl_join.schema(), - nl_join.filter(), - )? { - Ok(Some(Arc::new(NestedLoopJoinExec::try_new( - Arc::new(projected_left_child), - Arc::new(projected_right_child), - join_filter, - nl_join.join_type(), - // Returned early if projection is not None - None, - )?))) - } else { - try_embed_projection(projection, nl_join) - } -} - -/// Tries to push `projection` down through `hash_join`. If possible, performs the -/// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections -/// as its children. Otherwise, returns `None`. -fn try_pushdown_through_hash_join( - projection: &ProjectionExec, - hash_join: &HashJoinExec, -) -> Result>> { - // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. - if hash_join.contains_projection() { - return Ok(None); - } - - if let Some(JoinData { - projected_left_child, - projected_right_child, - join_filter, - join_on, - }) = try_pushdown_through_join( - projection, - hash_join.left(), - hash_join.right(), - hash_join.on(), - hash_join.schema(), - hash_join.filter(), - )? { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::new(projected_left_child), - Arc::new(projected_right_child), - join_on, - join_filter, - hash_join.join_type(), - // Returned early if projection is not None - None, - *hash_join.partition_mode(), - hash_join.null_equals_null, - )?))) - } else { - try_embed_projection(projection, hash_join) - } -} - -/// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, -/// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. -/// Otherwise, it returns None. -fn try_swapping_with_cross_join( - projection: &ProjectionExec, - cross_join: &CrossJoinExec, -) -> Result>> { - // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. - let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { - return Ok(None); - }; - - let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( - cross_join.left().schema().fields().len(), - &projection_as_columns, - ); - - if !join_allows_pushdown( - &projection_as_columns, - &cross_join.schema(), - far_right_left_col_ind, - far_left_right_col_ind, - ) { - return Ok(None); - } - - let (new_left, new_right) = new_join_children( - &projection_as_columns, - far_right_left_col_ind, - far_left_right_col_ind, - cross_join.left(), - cross_join.right(), - )?; - - Ok(Some(Arc::new(CrossJoinExec::new( - Arc::new(new_left), - Arc::new(new_right), - )))) -} - -/// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, -/// it returns the new swapped version having the [`SortMergeJoinExec`] as the top plan. -/// Otherwise, it returns None. -fn try_swapping_with_sort_merge_join( - projection: &ProjectionExec, - sm_join: &SortMergeJoinExec, -) -> Result>> { - // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. - let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { - return Ok(None); - }; - - let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( - sm_join.left().schema().fields().len(), - &projection_as_columns, - ); - - if !join_allows_pushdown( - &projection_as_columns, - &sm_join.schema(), - far_right_left_col_ind, - far_left_right_col_ind, - ) { - return Ok(None); - } - - let Some(new_on) = update_join_on( - &projection_as_columns[0..=far_right_left_col_ind as _], - &projection_as_columns[far_left_right_col_ind as _..], - sm_join.on(), - sm_join.left().schema().fields().len(), - ) else { - return Ok(None); - }; - - let (new_left, new_right) = new_join_children( - &projection_as_columns, - far_right_left_col_ind, - far_left_right_col_ind, - sm_join.children()[0], - sm_join.children()[1], - )?; - - Ok(Some(Arc::new(SortMergeJoinExec::try_new( - Arc::new(new_left), - Arc::new(new_right), - new_on, - sm_join.filter.clone(), - sm_join.join_type, - sm_join.sort_options.clone(), - sm_join.null_equals_null, - )?))) -} - -/// Tries to swap the projection with its input [`SymmetricHashJoinExec`]. If it can be done, -/// it returns the new swapped version having the [`SymmetricHashJoinExec`] as the top plan. -/// Otherwise, it returns None. -fn try_swapping_with_sym_hash_join( - projection: &ProjectionExec, - sym_join: &SymmetricHashJoinExec, -) -> Result>> { - // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. - let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { - return Ok(None); - }; - - let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( - sym_join.left().schema().fields().len(), - &projection_as_columns, - ); - - if !join_allows_pushdown( - &projection_as_columns, - &sym_join.schema(), - far_right_left_col_ind, - far_left_right_col_ind, - ) { - return Ok(None); - } - - let Some(new_on) = update_join_on( - &projection_as_columns[0..=far_right_left_col_ind as _], - &projection_as_columns[far_left_right_col_ind as _..], - sym_join.on(), - sym_join.left().schema().fields().len(), - ) else { - return Ok(None); - }; - - let new_filter = if let Some(filter) = sym_join.filter() { - match update_join_filter( - &projection_as_columns[0..=far_right_left_col_ind as _], - &projection_as_columns[far_left_right_col_ind as _..], - filter, - sym_join.left().schema().fields().len(), - ) { - Some(updated_filter) => Some(updated_filter), - None => return Ok(None), - } - } else { - None - }; - - let (new_left, new_right) = new_join_children( - &projection_as_columns, - far_right_left_col_ind, - far_left_right_col_ind, - sym_join.left(), - sym_join.right(), - )?; - - Ok(Some(Arc::new(SymmetricHashJoinExec::try_new( - Arc::new(new_left), - Arc::new(new_right), - new_on, - new_filter, - sym_join.join_type(), - sym_join.null_equals_null(), - sym_join - .right() - .output_ordering() - .map(|p| LexOrdering::new(p.to_vec())), - sym_join - .left() - .output_ordering() - .map(|p| LexOrdering::new(p.to_vec())), - sym_join.partition_mode(), - )?))) -} - -/// Compare the inputs and outputs of the projection. All expressions must be -/// columns without alias, and projection does not change the order of fields. -/// For example, if the input schema is `a, b`, `SELECT a, b` is removable, -/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not. -fn is_projection_removable(projection: &ProjectionExec) -> bool { - let exprs = projection.expr(); - exprs.iter().enumerate().all(|(idx, (expr, alias))| { - let Some(col) = expr.as_any().downcast_ref::() else { - return false; - }; - col.name() == alias && col.index() == idx - }) && exprs.len() == projection.input().schema().fields().len() -} - -/// Given the expression set of a projection, checks if the projection causes -/// any renaming or constructs a non-`Column` physical expression. -fn all_alias_free_columns(exprs: &[(Arc, String)]) -> bool { - exprs.iter().all(|(expr, alias)| { - expr.as_any() - .downcast_ref::() - .map(|column| column.name() == alias) - .unwrap_or(false) - }) -} - -/// Updates a source provider's projected columns according to the given -/// projection operator's expressions. To use this function safely, one must -/// ensure that all expressions are `Column` expressions without aliases. -fn new_projections_for_columns( - projection: &ProjectionExec, - source: &[usize], -) -> Vec { - projection - .expr() - .iter() - .filter_map(|(expr, _)| { - expr.as_any() - .downcast_ref::() - .map(|expr| source[expr.index()]) - }) - .collect() -} - -/// The function operates in two modes: -/// -/// 1) When `sync_with_child` is `true`: -/// -/// The function updates the indices of `expr` if the expression resides -/// in the input plan. For instance, given the expressions `a@1 + b@2` -/// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are -/// updated to `a@0 + b@1` and `c@2`. -/// -/// 2) When `sync_with_child` is `false`: -/// -/// The function determines how the expression would be updated if a projection -/// was placed before the plan associated with the expression. If the expression -/// cannot be rewritten after the projection, it returns `None`. For example, -/// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with -/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes -/// `a@0`, but `b@2` results in `None` since the projection does not include `b`. -fn update_expr( - expr: &Arc, - projected_exprs: &[(Arc, String)], - sync_with_child: bool, -) -> Result>> { - #[derive(Debug, PartialEq)] - enum RewriteState { - /// The expression is unchanged. - Unchanged, - /// Some part of the expression has been rewritten - RewrittenValid, - /// Some part of the expression has been rewritten, but some column - /// references could not be. - RewrittenInvalid, - } - - let mut state = RewriteState::Unchanged; - - let new_expr = Arc::clone(expr) - .transform_up(|expr: Arc| { - if state == RewriteState::RewrittenInvalid { - return Ok(Transformed::no(expr)); - } - - let Some(column) = expr.as_any().downcast_ref::() else { - return Ok(Transformed::no(expr)); - }; - if sync_with_child { - state = RewriteState::RewrittenValid; - // Update the index of `column`: - Ok(Transformed::yes(Arc::clone( - &projected_exprs[column.index()].0, - ))) - } else { - // default to invalid, in case we can't find the relevant column - state = RewriteState::RewrittenInvalid; - // Determine how to update `column` to accommodate `projected_exprs` - projected_exprs - .iter() - .enumerate() - .find_map(|(index, (projected_expr, alias))| { - projected_expr.as_any().downcast_ref::().and_then( - |projected_column| { - (column.name().eq(projected_column.name()) - && column.index() == projected_column.index()) - .then(|| { - state = RewriteState::RewrittenValid; - Arc::new(Column::new(alias, index)) as _ - }) - }, - ) - }) - .map_or_else( - || Ok(Transformed::no(expr)), - |c| Ok(Transformed::yes(c)), - ) - } - }) - .data(); - - new_expr.map(|e| (state == RewriteState::RewrittenValid).then_some(e)) -} - -/// Creates a new [`ProjectionExec`] instance with the given child plan and -/// projected expressions. -fn make_with_child( - projection: &ProjectionExec, - child: &Arc, -) -> Result> { - ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child)) - .map(|e| Arc::new(e) as _) -} - -/// Returns `true` if all the expressions in the argument are `Column`s. -fn all_columns(exprs: &[(Arc, String)]) -> bool { - exprs.iter().all(|(expr, _)| expr.as_any().is::()) -} - -/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given -/// expressions is not a `Column`, returns `None`. -fn physical_to_column_exprs( - exprs: &[(Arc, String)], -) -> Option> { - exprs - .iter() - .map(|(expr, alias)| { - expr.as_any() - .downcast_ref::() - .map(|col| (col.clone(), alias.clone())) - }) - .collect() -} - -/// Returns the last index before encountering a column coming from the right table when traveling -/// through the projection from left to right, and the last index before encountering a column -/// coming from the left table when traveling through the projection from right to left. -/// If there is no column in the projection coming from the left side, it returns (-1, ...), -/// if there is no column in the projection coming from the right side, it returns (..., projection length). -fn join_table_borders( - left_table_column_count: usize, - projection_as_columns: &[(Column, String)], -) -> (i32, i32) { - let far_right_left_col_ind = projection_as_columns - .iter() - .enumerate() - .take_while(|(_, (projection_column, _))| { - projection_column.index() < left_table_column_count - }) - .last() - .map(|(index, _)| index as i32) - .unwrap_or(-1); - - let far_left_right_col_ind = projection_as_columns - .iter() - .enumerate() - .rev() - .take_while(|(_, (projection_column, _))| { - projection_column.index() >= left_table_column_count - }) - .last() - .map(|(index, _)| index as i32) - .unwrap_or(projection_as_columns.len() as i32); - - (far_right_left_col_ind, far_left_right_col_ind) -} - -/// Tries to update the equi-join `Column`'s of a join as if the input of -/// the join was replaced by a projection. -fn update_join_on( - proj_left_exprs: &[(Column, String)], - proj_right_exprs: &[(Column, String)], - hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)], - left_field_size: usize, -) -> Option> { - // TODO: Clippy wants the "map" call removed, but doing so generates - // a compilation error. Remove the clippy directive once this - // issue is fixed. - #[allow(clippy::map_identity)] - let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on - .iter() - .map(|(left, right)| (left, right)) - .unzip(); - - let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0); - let new_right_columns = - new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size); - - match (new_left_columns, new_right_columns) { - (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()), - _ => None, - } -} - -/// This function generates a new set of columns to be used in a hash join -/// operation based on a set of equi-join conditions (`hash_join_on`) and a -/// list of projection expressions (`projection_exprs`). -/// -/// Notes: Column indices in the projection expressions are based on the join schema, -/// whereas the join on expressions are based on the join child schema. `column_index_offset` -/// represents the offset between them. -fn new_columns_for_join_on( - hash_join_on: &[&PhysicalExprRef], - projection_exprs: &[(Column, String)], - column_index_offset: usize, -) -> Option> { - let new_columns = hash_join_on - .iter() - .filter_map(|on| { - // Rewrite all columns in `on` - Arc::clone(*on) - .transform(|expr| { - if let Some(column) = expr.as_any().downcast_ref::() { - // Find the column in the projection expressions - let new_column = projection_exprs - .iter() - .enumerate() - .find(|(_, (proj_column, _))| { - column.name() == proj_column.name() - && column.index() + column_index_offset - == proj_column.index() - }) - .map(|(index, (_, alias))| Column::new(alias, index)); - if let Some(new_column) = new_column { - Ok(Transformed::yes(Arc::new(new_column))) - } else { - // If the column is not found in the projection expressions, - // it means that the column is not projected. In this case, - // we cannot push the projection down. - internal_err!( - "Column {:?} not found in projection expressions", - column - ) - } - } else { - Ok(Transformed::no(expr)) - } - }) - .data() - .ok() - }) - .collect::>(); - (new_columns.len() == hash_join_on.len()).then_some(new_columns) -} - -/// Tries to update the column indices of a [`JoinFilter`] as if the input of -/// the join was replaced by a projection. -fn update_join_filter( - projection_left_exprs: &[(Column, String)], - projection_right_exprs: &[(Column, String)], - join_filter: &JoinFilter, - left_field_size: usize, -) -> Option { - let mut new_left_indices = new_indices_for_join_filter( - join_filter, - JoinSide::Left, - projection_left_exprs, - 0, - ) - .into_iter(); - let mut new_right_indices = new_indices_for_join_filter( - join_filter, - JoinSide::Right, - projection_right_exprs, - left_field_size, - ) - .into_iter(); - - // Check if all columns match: - (new_right_indices.len() + new_left_indices.len() - == join_filter.column_indices().len()) - .then(|| { - JoinFilter::new( - Arc::clone(join_filter.expression()), - join_filter - .column_indices() - .iter() - .map(|col_idx| ColumnIndex { - index: if col_idx.side == JoinSide::Left { - new_left_indices.next().unwrap() - } else { - new_right_indices.next().unwrap() - }, - side: col_idx.side, - }) - .collect(), - Arc::clone(join_filter.schema()), - ) - }) -} - -/// This function determines and returns a vector of indices representing the -/// positions of columns in `projection_exprs` that are involved in `join_filter`, -/// and correspond to a particular side (`join_side`) of the join operation. -/// -/// Notes: Column indices in the projection expressions are based on the join schema, -/// whereas the join filter is based on the join child schema. `column_index_offset` -/// represents the offset between them. -fn new_indices_for_join_filter( - join_filter: &JoinFilter, - join_side: JoinSide, - projection_exprs: &[(Column, String)], - column_index_offset: usize, -) -> Vec { - join_filter - .column_indices() - .iter() - .filter(|col_idx| col_idx.side == join_side) - .filter_map(|col_idx| { - projection_exprs - .iter() - .position(|(col, _)| col_idx.index + column_index_offset == col.index()) - }) - .collect() -} - -/// Checks three conditions for pushing a projection down through a join: -/// - Projection must narrow the join output schema. -/// - Columns coming from left/right tables must be collected at the left/right -/// sides of the output table. -/// - Left or right table is not lost after the projection. -fn join_allows_pushdown( - projection_as_columns: &[(Column, String)], - join_schema: &SchemaRef, - far_right_left_col_ind: i32, - far_left_right_col_ind: i32, -) -> bool { - // Projection must narrow the join output: - projection_as_columns.len() < join_schema.fields().len() - // Are the columns from different tables mixed? - && (far_right_left_col_ind + 1 == far_left_right_col_ind) - // Left or right table is not lost after the projection. - && far_right_left_col_ind >= 0 - && far_left_right_col_ind < projection_as_columns.len() as i32 -} - -/// If pushing down the projection over this join's children seems possible, -/// this function constructs the new [`ProjectionExec`]s that will come on top -/// of the original children of the join. -fn new_join_children( - projection_as_columns: &[(Column, String)], - far_right_left_col_ind: i32, - far_left_right_col_ind: i32, - left_child: &Arc, - right_child: &Arc, -) -> Result<(ProjectionExec, ProjectionExec)> { - let new_left = ProjectionExec::try_new( - projection_as_columns[0..=far_right_left_col_ind as _] - .iter() - .map(|(col, alias)| { - ( - Arc::new(Column::new(col.name(), col.index())) as _, - alias.clone(), - ) - }) - .collect_vec(), - Arc::clone(left_child), - )?; - let left_size = left_child.schema().fields().len() as i32; - let new_right = ProjectionExec::try_new( - projection_as_columns[far_left_right_col_ind as _..] - .iter() - .map(|(col, alias)| { - ( - Arc::new(Column::new( - col.name(), - // Align projected expressions coming from the right - // table with the new right child projection: - (col.index() as i32 - left_size) as _, - )) as _, - alias.clone(), - ) - }) - .collect_vec(), - Arc::clone(right_child), - )?; - - Ok((new_left, new_right)) -} - #[cfg(test)] mod tests { - use super::*; use std::any::Any; + use super::*; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; + use crate::datasource::physical_plan::CsvExec; use crate::datasource::physical_plan::FileScanConfig; use crate::physical_plan::get_plan_string; - use crate::physical_plan::joins::StreamJoinPartitionMode; + use crate::physical_plan::joins::{ + HashJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, + }; + use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::repartition::RepartitionExec; + use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - use arrow_schema::{DataType, Field, Schema, SortOptions}; - use datafusion_common::{JoinType, ScalarValue}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; + use datafusion_common::{JoinSide, JoinType, ScalarValue}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{ ColumnarValue, Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; use datafusion_physical_expr::expressions::{ - binary, col, BinaryExpr, CaseExpr, CastExpr, NegativeExpr, + binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, }; use datafusion_physical_expr::ScalarFunctionExpr; + use datafusion_physical_expr::{ + Distribution, Partitioning, PhysicalExpr, PhysicalSortExpr, + PhysicalSortRequirement, + }; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; + use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion_physical_plan::filter::FilterExec; + use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::PartitionMode; + use datafusion_physical_plan::projection::{update_expr, ProjectionExec}; use datafusion_physical_plan::streaming::PartitionStream; + use datafusion_physical_plan::streaming::StreamingTableExec; + use datafusion_physical_plan::union::UnionExec; + + use itertools::Itertools; /// Mocked UDF #[derive(Debug)] @@ -1680,61 +413,6 @@ mod tests { Ok(()) } - #[test] - fn test_join_table_borders() -> Result<()> { - let projections = vec![ - (Column::new("b", 1), "b".to_owned()), - (Column::new("c", 2), "c".to_owned()), - (Column::new("e", 4), "e".to_owned()), - (Column::new("d", 3), "d".to_owned()), - (Column::new("c", 2), "c".to_owned()), - (Column::new("f", 5), "f".to_owned()), - (Column::new("h", 7), "h".to_owned()), - (Column::new("g", 6), "g".to_owned()), - ]; - let left_table_column_count = 5; - assert_eq!( - join_table_borders(left_table_column_count, &projections), - (4, 5) - ); - - let left_table_column_count = 8; - assert_eq!( - join_table_borders(left_table_column_count, &projections), - (7, 8) - ); - - let left_table_column_count = 1; - assert_eq!( - join_table_borders(left_table_column_count, &projections), - (-1, 0) - ); - - let projections = vec![ - (Column::new("a", 0), "a".to_owned()), - (Column::new("b", 1), "b".to_owned()), - (Column::new("d", 3), "d".to_owned()), - (Column::new("g", 6), "g".to_owned()), - (Column::new("e", 4), "e".to_owned()), - (Column::new("f", 5), "f".to_owned()), - (Column::new("e", 4), "e".to_owned()), - (Column::new("h", 7), "h".to_owned()), - ]; - let left_table_column_count = 5; - assert_eq!( - join_table_borders(left_table_column_count, &projections), - (2, 7) - ); - - let left_table_column_count = 7; - assert_eq!( - join_table_borders(left_table_column_count, &projections), - (6, 7) - ); - - Ok(()) - } - fn create_simple_csv_exec() -> Arc { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), @@ -2409,22 +1087,6 @@ mod tests { Ok(()) } - #[test] - fn test_collect_column_indices() -> Result<()> { - let expr = Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 7)), - Operator::Minus, - Arc::new(BinaryExpr::new( - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), - Operator::Plus, - Arc::new(Column::new("a", 1)), - )), - )); - let column_indices = collect_column_indices(&[(expr, "b-(1+a)".to_string())]); - assert_eq!(column_indices, vec![1, 7]); - Ok(()) - } - #[test] fn test_nested_loop_join_after_projection() -> Result<()> { let left_csv = create_simple_csv_exec(); diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index e107bb85d7b8..90a570894a44 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -24,21 +24,23 @@ use std::sync::Arc; -use datafusion_execution::TaskContext; -use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, -}; +use crate::PhysicalOptimizerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; -use datafusion_physical_expr::{Distribution, LexRequirement}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement}; +use datafusion_physical_plan::projection::{ + make_with_child, update_expr, ProjectionExec, +}; +use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, +}; use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties}; -use crate::PhysicalOptimizerRule; - /// This rule either adds or removes [`OutputRequirements`]s to/from the physical /// plan according to its `mode` attribute, which is set by the constructors /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of @@ -192,6 +194,56 @@ impl ExecutionPlan for OutputRequirementExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down: + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + let mut updated_sort_reqs = LexRequirement::new(vec![]); + // None or empty_vec can be treated in the same way. + if let Some(reqs) = &self.required_input_ordering()[0] { + for req in &reqs.inner { + let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)? + else { + return Ok(None); + }; + updated_sort_reqs.push(PhysicalSortRequirement { + expr: new_expr, + options: req.options, + }); + } + } + + let dist_req = match &self.required_input_distribution()[0] { + Distribution::HashPartitioned(exprs) => { + let mut updated_exprs = vec![]; + for expr in exprs { + let Some(new_expr) = update_expr(expr, projection.expr(), false)? + else { + return Ok(None); + }; + updated_exprs.push(new_expr); + } + Distribution::HashPartitioned(updated_exprs) + } + dist => dist.clone(), + }; + + make_with_child(projection, &self.input()) + .map(|input| { + OutputRequirementExec::new( + input, + (!updated_sort_reqs.is_empty()).then_some(updated_sort_reqs), + dist_req, + ) + }) + .map(|e| Some(Arc::new(e) as _)) + } } impl PhysicalOptimizerRule for OutputRequirements { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 7c1bdba2f339..3900bd1ddca9 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -27,10 +27,10 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, Statistics, }; - +use crate::execution_plan::CardinalityEffect; +use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; -use crate::execution_plan::CardinalityEffect; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; @@ -184,6 +184,22 @@ impl ExecutionPlan for CoalescePartitionsExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + /// Tries to swap `projection` with its input, which is known to be a + /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns + /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down: + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + // CoalescePartitionsExec always has a single child, so zero indexing is safe. + make_with_child(projection, projection.input().children()[0]) + .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _)) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 753234c09994..a54b46111f53 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -15,42 +15,45 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use std::fmt::Debug; -use std::sync::Arc; - -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use arrow_array::Array; -use futures::stream::{StreamExt, TryStreamExt}; -use tokio::task::JoinSet; +pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +pub use crate::metrics::Metric; +pub use crate::ordering::InputOrderMode; +pub use crate::stream::EmptyRecordBatchStream; -use datafusion_common::config::ConfigOptions; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; -use datafusion_common::{exec_err, Constraints, Result}; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; -use datafusion_execution::TaskContext; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ expressions, udf, Distribution, Partitioning, PhysicalExpr, }; -use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; -use datafusion_physical_expr_common::sort_expr::LexRequirement; + +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; -pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -pub use crate::metrics::Metric; use crate::metrics::MetricsSet; -pub use crate::ordering::InputOrderMode; +use crate::projection::ProjectionExec; use crate::repartition::RepartitionExec; use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; -pub use crate::stream::EmptyRecordBatchStream; use crate::stream::RecordBatchStreamAdapter; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use arrow_array::Array; +use datafusion_common::config::ConfigOptions; +use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; + +use futures::stream::{StreamExt, TryStreamExt}; +use tokio::task::JoinSet; + /// Represent nodes in the DataFusion Physical Plan. /// /// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of @@ -431,6 +434,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown } + + /// Attempts to push down the given projection into the input of this `ExecutionPlan`. + /// + /// If the operator supports this optimization, the resulting plan will be: + /// `self_new <- projection <- source`, starting from `projection <- self <- source`. + /// Otherwise, it returns the current `ExecutionPlan` as-is. + /// + /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported + /// or not possible, or `Err` on failure. + fn try_swapping_with_projection( + &self, + _projection: &ProjectionExec, + ) -> Result>> { + Ok(None) + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8e7c14f0baed..ae4a15ba5249 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -25,6 +25,11 @@ use super::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::common::can_project; +use crate::execution_plan::CardinalityEffect; +use crate::projection::{ + make_with_child, try_embed_projection, update_expr, EmbeddedProjection, + ProjectionExec, +}; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, @@ -49,7 +54,6 @@ use datafusion_physical_expr::{ ExprBoundaries, PhysicalExpr, }; -use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -399,6 +403,38 @@ impl ExecutionPlan for FilterExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::LowerEqual } + + /// Tries to swap `projection` with its input (`filter`). If possible, performs + /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down: + if projection.expr().len() < projection.input().schema().fields().len() { + // Each column in the predicate expression must exist after the projection. + if let Some(new_predicate) = + update_expr(self.predicate(), projection.expr(), false)? + { + return FilterExec::try_new( + new_predicate, + make_with_child(projection, self.input())?, + ) + .and_then(|e| { + let selectivity = self.default_selectivity(); + e.with_default_selectivity(selectivity) + }) + .map(|e| Some(Arc::new(e) as _)); + } + } + try_embed_projection(projection, self) + } +} + +impl EmbeddedProjection for FilterExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } } /// This function ensures that all bounds in the `ExprBoundaries` vector are diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 69300fce7745..87fd0f96586a 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -18,6 +18,8 @@ //! Defines the cross join plan for loading the left side of the cross join //! and producing batches in parallel for the right partitions +use std::{any::Any, sync::Arc, task::Poll}; + use super::utils::{ adjust_right_output_partitioning, reorder_output_after_swap, BatchSplitter, BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut, @@ -26,14 +28,17 @@ use super::utils::{ use crate::coalesce_partitions::CoalescePartitionsExec; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::{ + join_allows_pushdown, join_table_borders, new_join_children, + physical_to_column_exprs, ProjectionExec, +}; use crate::{ handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use arrow::compute::concat_batches; -use std::{any::Any, sync::Arc, task::Poll}; +use arrow::compute::concat_batches; use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::RecordBatchOptions; @@ -335,6 +340,47 @@ impl ExecutionPlan for CrossJoinExec { self.right.statistics()?, )) } + + /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, + /// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. + /// Otherwise, it returns None. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. + let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) + else { + return Ok(None); + }; + + let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( + self.left().schema().fields().len(), + &projection_as_columns, + ); + + if !join_allows_pushdown( + &projection_as_columns, + &self.schema(), + far_right_left_col_ind, + far_left_right_col_ind, + ) { + return Ok(None); + } + + let (new_left, new_right) = new_join_children( + &projection_as_columns, + far_right_left_col_ind, + far_left_right_col_ind, + self.left(), + self.right(), + )?; + + Ok(Some(Arc::new(CrossJoinExec::new( + Arc::new(new_left), + Arc::new(new_right), + )))) + } } /// [left/right]_col_count are required in case the column statistics are None diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 4e224ea65d19..bac72e8a0cc7 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -33,6 +33,11 @@ use super::{ PartitionMode, SharedBitmapBuilder, }; use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::projection::{ + try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, + ProjectionExec, +}; +use crate::spill::get_record_batch_memory_size; use crate::ExecutionPlanProperties; use crate::{ coalesce_partitions::CoalescePartitionsExec, @@ -69,15 +74,14 @@ use datafusion_common::{ }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; +use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::datum::compare_op_for_nested; -use crate::spill::get_record_batch_memory_size; use ahash::RandomState; -use datafusion_expr::Operator; -use datafusion_physical_expr_common::datum::compare_op_for_nested; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; @@ -864,6 +868,47 @@ impl ExecutionPlan for HashJoinExec { // Project statistics if there is a projection Ok(stats.project(self.projection.as_ref())) } + + /// Tries to push `projection` down through `hash_join`. If possible, performs the + /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections + /// as its children. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // TODO: currently if there is projection in HashJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. + if self.contains_projection() { + return Ok(None); + } + + if let Some(JoinData { + projected_left_child, + projected_right_child, + join_filter, + join_on, + }) = try_pushdown_through_join( + projection, + self.left(), + self.right(), + self.on(), + self.schema(), + self.filter(), + )? { + Ok(Some(Arc::new(HashJoinExec::try_new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_on, + join_filter, + self.join_type(), + // Returned early if projection is not None + None, + *self.partition_mode(), + self.null_equals_null, + )?))) + } else { + try_embed_projection(projection, self) + } + } } /// Reads the left (build) side of the input, buffering it in memory, to build a @@ -1584,6 +1629,12 @@ impl Stream for HashJoinStream { } } +impl EmbeddedProjection for HashJoinExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d2fa2fdc7b4d..ce960df32ec2 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -36,7 +36,12 @@ use crate::joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, }; +use crate::joins::SharedBitmapBuilder; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::{ + try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, + ProjectionExec, +}; use crate::{ handle_state, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, @@ -57,7 +62,6 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use crate::joins::SharedBitmapBuilder; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; @@ -555,6 +559,44 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, ) } + + /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the + /// pushdown and returns a new [`NestedLoopJoinExec`] as the top plan which has projections + /// as its children. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // TODO: currently if there is projection in NestedLoopJoinExec, we can't push down projection to left or right input. Maybe we can pushdown the mixed projection later. + if self.contains_projection() { + return Ok(None); + } + + if let Some(JoinData { + projected_left_child, + projected_right_child, + join_filter, + .. + }) = try_pushdown_through_join( + projection, + self.left(), + self.right(), + &[], + self.schema(), + self.filter(), + )? { + Ok(Some(Arc::new(NestedLoopJoinExec::try_new( + Arc::new(projected_left_child), + Arc::new(projected_right_child), + join_filter, + self.join_type(), + // Returned early if projection is not None + None, + )?))) + } else { + try_embed_projection(projection, self) + } + } } /// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it @@ -979,6 +1021,12 @@ impl RecordBatchStream for NestedLoopJoinStr } } +impl EmbeddedProjection for NestedLoopJoinExec { + fn with_projection(&self, projection: Option>) -> Result { + self.with_projection(projection) + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 650d4c1de149..e03aca76409d 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -34,6 +34,25 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::expressions::PhysicalSortExpr; +use crate::joins::utils::{ + build_join_schema, check_join_is_valid, estimate_join_statistics, + reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn, + JoinOnRef, +}; +use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use crate::projection::{ + join_allows_pushdown, join_table_borders, new_join_children, + physical_to_column_exprs, update_join_on, ProjectionExec, +}; +use crate::spill::spill_record_batches; +use crate::{ + metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, +}; + use arrow::array::*; use arrow::compute::{ self, concat_batches, filter_record_batch, is_not_null, take, SortOptions, @@ -54,21 +73,6 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; -use crate::execution_plan::{boundedness_from_children, EmissionType}; -use crate::expressions::PhysicalSortExpr; -use crate::joins::utils::{ - build_join_schema, check_join_is_valid, estimate_join_statistics, - reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn, - JoinOnRef, -}; -use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; -use crate::spill::spill_record_batches; -use crate::{ - metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, -}; - use futures::{Stream, StreamExt}; /// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge @@ -504,6 +508,61 @@ impl ExecutionPlan for SortMergeJoinExec { &self.schema, ) } + + /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, + /// it returns the new swapped version having the [`SortMergeJoinExec`] as the top plan. + /// Otherwise, it returns None. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. + let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) + else { + return Ok(None); + }; + + let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( + self.left().schema().fields().len(), + &projection_as_columns, + ); + + if !join_allows_pushdown( + &projection_as_columns, + &self.schema(), + far_right_left_col_ind, + far_left_right_col_ind, + ) { + return Ok(None); + } + + let Some(new_on) = update_join_on( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + self.on(), + self.left().schema().fields().len(), + ) else { + return Ok(None); + }; + + let (new_left, new_right) = new_join_children( + &projection_as_columns, + far_right_left_col_ind, + far_left_right_col_ind, + self.children()[0], + self.children()[1], + )?; + + Ok(Some(Arc::new(SortMergeJoinExec::try_new( + Arc::new(new_left), + Arc::new(new_right), + new_on, + self.filter.clone(), + self.join_type, + self.sort_options.clone(), + self.null_equals_null, + )?))) + } } /// Metrics for SortMergeJoinExec diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b050d3adfeb5..2dbff25e703e 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -47,6 +47,10 @@ use crate::joins::utils::{ BatchTransformer, ColumnIndex, JoinFilter, JoinHashMapType, JoinOn, JoinOnRef, NoopBatchTransformer, StatefulStreamResult, }; +use crate::projection::{ + join_allows_pushdown, join_table_borders, new_join_children, + physical_to_column_exprs, update_join_filter, update_join_on, ProjectionExec, +}; use crate::{ joins::StreamJoinPartitionMode, metrics::{ExecutionPlanMetricsSet, MetricsSet}, @@ -71,9 +75,9 @@ use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use ahash::RandomState; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{ready, Stream, StreamExt}; use parking_lot::Mutex; @@ -556,6 +560,81 @@ impl ExecutionPlan for SymmetricHashJoinExec { })) } } + + /// Tries to swap the projection with its input [`SymmetricHashJoinExec`]. If it can be done, + /// it returns the new swapped version having the [`SymmetricHashJoinExec`] as the top plan. + /// Otherwise, it returns None. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // Convert projected PhysicalExpr's to columns. If not possible, we cannot proceed. + let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) + else { + return Ok(None); + }; + + let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders( + self.left().schema().fields().len(), + &projection_as_columns, + ); + + if !join_allows_pushdown( + &projection_as_columns, + &self.schema(), + far_right_left_col_ind, + far_left_right_col_ind, + ) { + return Ok(None); + } + + let Some(new_on) = update_join_on( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + self.on(), + self.left().schema().fields().len(), + ) else { + return Ok(None); + }; + + let new_filter = if let Some(filter) = self.filter() { + match update_join_filter( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + filter, + self.left().schema().fields().len(), + ) { + Some(updated_filter) => Some(updated_filter), + None => return Ok(None), + } + } else { + None + }; + + let (new_left, new_right) = new_join_children( + &projection_as_columns, + far_right_left_col_ind, + far_left_right_col_ind, + self.left(), + self.right(), + )?; + + Ok(Some(Arc::new(SymmetricHashJoinExec::try_new( + Arc::new(new_left), + Arc::new(new_right), + new_on, + new_filter, + self.join_type(), + self.null_equals_null(), + self.right() + .output_ordering() + .map(|p| LexOrdering::new(p.to_vec())), + self.left() + .output_ordering() + .map(|p| LexOrdering::new(p.to_vec())), + self.partition_mode(), + )?))) + } } /// A stream that issues [RecordBatch]es as they arrive from the right of the join. diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index fb58a04fcc20..5ad3c4881b39 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -17,7 +17,6 @@ //! Execution plan for reading in-memory batches of data -use parking_lot::RwLock; use std::any::Any; use std::fmt; use std::sync::Arc; @@ -29,6 +28,9 @@ use super::{ Statistics, }; use crate::execution_plan::{Boundedness, EmissionType}; +use crate::projection::{ + all_alias_free_columns, new_projections_for_columns, ProjectionExec, +}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -45,6 +47,7 @@ use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use futures::Stream; +use parking_lot::RwLock; /// Execution plan for reading in-memory batches of data #[derive(Clone)] @@ -166,6 +169,30 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )) } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If there is any non-column or alias-carrier expression, Projection should not be removed. + // This process can be moved into MemoryExec, but it would be an overlap of their responsibility. + all_alias_free_columns(projection.expr()) + .then(|| { + let all_projections = (0..self.schema().fields().len()).collect(); + let new_projections = new_projections_for_columns( + projection, + self.projection().as_ref().unwrap_or(&all_projections), + ); + + MemoryExec::try_new( + self.partitions(), + self.original_schema(), + Some(new_projections), + ) + .map(|e| Arc::new(e) as _) + }) + .transpose() + } } impl MemoryExec { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e37a6b0dfb85..b364d4a870e3 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -26,24 +26,30 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use super::expressions::Column; +use super::expressions::{CastExpr, Column, Literal}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::CardinalityEffect; +use crate::joins::utils::{ColumnIndex, JoinFilter}; use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::stats::Precision; -use datafusion_common::Result; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; +use datafusion_common::{internal_err, JoinSide, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{CastExpr, Literal}; +use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::PhysicalExprRef; -use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; +use itertools::Itertools; use log::trace; /// Execution plan for a projection @@ -239,6 +245,19 @@ impl ExecutionPlan for ProjectionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + let maybe_unified = try_unifying_projections(projection, self)?; + if let Some(new_plan) = maybe_unified { + // To unify 3 or more sequential projections: + remove_unnecessary_projections(new_plan).data().map(Some) + } else { + Ok(Some(Arc::new(projection.clone()))) + } + } } /// If 'e' is a direct column reference, returns the field level @@ -353,15 +372,714 @@ impl RecordBatchStream for ProjectionStream { } } +pub trait EmbeddedProjection: ExecutionPlan + Sized { + fn with_projection(&self, projection: Option>) -> Result; +} + +/// Some projection can't be pushed down left input or right input of hash join because filter or on need may need some columns that won't be used in later. +/// By embed those projection to hash join, we can reduce the cost of build_batch_from_indices in hash join (build_batch_from_indices need to can compute::take() for each column) and avoid unnecessary output creation. +pub fn try_embed_projection( + projection: &ProjectionExec, + execution_plan: &Exec, +) -> Result>> { + // Collect all column indices from the given projection expressions. + let projection_index = collect_column_indices(projection.expr()); + + if projection_index.is_empty() { + return Ok(None); + }; + + // If the projection indices is the same as the input columns, we don't need to embed the projection to hash join. + // Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields. + if projection_index.len() == projection_index.last().unwrap() + 1 + && projection_index.len() == execution_plan.schema().fields().len() + { + return Ok(None); + } + + let new_execution_plan = + Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?); + + // Build projection expressions for update_expr. Zip the projection_index with the new_execution_plan output schema fields. + let embed_project_exprs = projection_index + .iter() + .zip(new_execution_plan.schema().fields()) + .map(|(index, field)| { + ( + Arc::new(Column::new(field.name(), *index)) as Arc, + field.name().to_owned(), + ) + }) + .collect::>(); + + let mut new_projection_exprs = Vec::with_capacity(projection.expr().len()); + + for (expr, alias) in projection.expr() { + // update column index for projection expression since the input schema has been changed. + let Some(expr) = update_expr(expr, embed_project_exprs.as_slice(), false)? else { + return Ok(None); + }; + new_projection_exprs.push((expr, alias.clone())); + } + // Old projection may contain some alias or expression such as `a + 1` and `CAST('true' AS BOOLEAN)`, but our projection_exprs in hash join just contain column, so we need to create the new projection to keep the original projection. + let new_projection = Arc::new(ProjectionExec::try_new( + new_projection_exprs, + Arc::clone(&new_execution_plan) as _, + )?); + if is_projection_removable(&new_projection) { + Ok(Some(new_execution_plan)) + } else { + Ok(Some(new_projection)) + } +} + +/// The on clause of the join, as vector of (left, right) columns. +pub type JoinOn = Vec<(PhysicalExprRef, PhysicalExprRef)>; +/// Reference for JoinOn. +pub type JoinOnRef<'a> = &'a [(PhysicalExprRef, PhysicalExprRef)]; + +pub struct JoinData { + pub projected_left_child: ProjectionExec, + pub projected_right_child: ProjectionExec, + pub join_filter: Option, + pub join_on: JoinOn, +} + +pub fn try_pushdown_through_join( + projection: &ProjectionExec, + join_left: &Arc, + join_right: &Arc, + join_on: JoinOnRef, + schema: SchemaRef, + filter: Option<&JoinFilter>, +) -> Result> { + // Convert projected expressions to columns. We can not proceed if this is not possible. + let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else { + return Ok(None); + }; + + let (far_right_left_col_ind, far_left_right_col_ind) = + join_table_borders(join_left.schema().fields().len(), &projection_as_columns); + + if !join_allows_pushdown( + &projection_as_columns, + &schema, + far_right_left_col_ind, + far_left_right_col_ind, + ) { + return Ok(None); + } + + let new_filter = if let Some(filter) = filter { + match update_join_filter( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + filter, + join_left.schema().fields().len(), + ) { + Some(updated_filter) => Some(updated_filter), + None => return Ok(None), + } + } else { + None + }; + + let Some(new_on) = update_join_on( + &projection_as_columns[0..=far_right_left_col_ind as _], + &projection_as_columns[far_left_right_col_ind as _..], + join_on, + join_left.schema().fields().len(), + ) else { + return Ok(None); + }; + + let (new_left, new_right) = new_join_children( + &projection_as_columns, + far_right_left_col_ind, + far_left_right_col_ind, + join_left, + join_right, + )?; + + Ok(Some(JoinData { + projected_left_child: new_left, + projected_right_child: new_right, + join_filter: new_filter, + join_on: new_on, + })) +} + +/// This function checks if `plan` is a [`ProjectionExec`], and inspects its +/// input(s) to test whether it can push `plan` under its input(s). This function +/// will operate on the entire tree and may ultimately remove `plan` entirely +/// by leveraging source providers with built-in projection capabilities. +pub fn remove_unnecessary_projections( + plan: Arc, +) -> Result>> { + let maybe_modified = + if let Some(projection) = plan.as_any().downcast_ref::() { + // If the projection does not cause any change on the input, we can + // safely remove it: + if is_projection_removable(projection) { + return Ok(Transformed::yes(Arc::clone(projection.input()))); + } + // If it does, check if we can push it under its child(ren): + projection + .input() + .try_swapping_with_projection(projection)? + } else { + return Ok(Transformed::no(plan)); + }; + Ok(maybe_modified.map_or(Transformed::no(plan), Transformed::yes)) +} + +/// Compare the inputs and outputs of the projection. All expressions must be +/// columns without alias, and projection does not change the order of fields. +/// For example, if the input schema is `a, b`, `SELECT a, b` is removable, +/// but `SELECT b, a` and `SELECT a+1, b` and `SELECT a AS c, b` are not. +fn is_projection_removable(projection: &ProjectionExec) -> bool { + let exprs = projection.expr(); + exprs.iter().enumerate().all(|(idx, (expr, alias))| { + let Some(col) = expr.as_any().downcast_ref::() else { + return false; + }; + col.name() == alias && col.index() == idx + }) && exprs.len() == projection.input().schema().fields().len() +} + +/// Given the expression set of a projection, checks if the projection causes +/// any renaming or constructs a non-`Column` physical expression. +pub fn all_alias_free_columns(exprs: &[(Arc, String)]) -> bool { + exprs.iter().all(|(expr, alias)| { + expr.as_any() + .downcast_ref::() + .map(|column| column.name() == alias) + .unwrap_or(false) + }) +} + +/// Updates a source provider's projected columns according to the given +/// projection operator's expressions. To use this function safely, one must +/// ensure that all expressions are `Column` expressions without aliases. +pub fn new_projections_for_columns( + projection: &ProjectionExec, + source: &[usize], +) -> Vec { + projection + .expr() + .iter() + .filter_map(|(expr, _)| { + expr.as_any() + .downcast_ref::() + .map(|expr| source[expr.index()]) + }) + .collect() +} + +/// Creates a new [`ProjectionExec`] instance with the given child plan and +/// projected expressions. +pub fn make_with_child( + projection: &ProjectionExec, + child: &Arc, +) -> Result> { + ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child)) + .map(|e| Arc::new(e) as _) +} + +/// Returns `true` if all the expressions in the argument are `Column`s. +pub fn all_columns(exprs: &[(Arc, String)]) -> bool { + exprs.iter().all(|(expr, _)| expr.as_any().is::()) +} + +/// The function operates in two modes: +/// +/// 1) When `sync_with_child` is `true`: +/// +/// The function updates the indices of `expr` if the expression resides +/// in the input plan. For instance, given the expressions `a@1 + b@2` +/// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are +/// updated to `a@0 + b@1` and `c@2`. +/// +/// 2) When `sync_with_child` is `false`: +/// +/// The function determines how the expression would be updated if a projection +/// was placed before the plan associated with the expression. If the expression +/// cannot be rewritten after the projection, it returns `None`. For example, +/// given the expressions `c@0`, `a@1` and `b@2`, and the [`ProjectionExec`] with +/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes +/// `a@0`, but `b@2` results in `None` since the projection does not include `b`. +pub fn update_expr( + expr: &Arc, + projected_exprs: &[(Arc, String)], + sync_with_child: bool, +) -> Result>> { + #[derive(Debug, PartialEq)] + enum RewriteState { + /// The expression is unchanged. + Unchanged, + /// Some part of the expression has been rewritten + RewrittenValid, + /// Some part of the expression has been rewritten, but some column + /// references could not be. + RewrittenInvalid, + } + + let mut state = RewriteState::Unchanged; + + let new_expr = Arc::clone(expr) + .transform_up(|expr: Arc| { + if state == RewriteState::RewrittenInvalid { + return Ok(Transformed::no(expr)); + } + + let Some(column) = expr.as_any().downcast_ref::() else { + return Ok(Transformed::no(expr)); + }; + if sync_with_child { + state = RewriteState::RewrittenValid; + // Update the index of `column`: + Ok(Transformed::yes(Arc::clone( + &projected_exprs[column.index()].0, + ))) + } else { + // default to invalid, in case we can't find the relevant column + state = RewriteState::RewrittenInvalid; + // Determine how to update `column` to accommodate `projected_exprs` + projected_exprs + .iter() + .enumerate() + .find_map(|(index, (projected_expr, alias))| { + projected_expr.as_any().downcast_ref::().and_then( + |projected_column| { + (column.name().eq(projected_column.name()) + && column.index() == projected_column.index()) + .then(|| { + state = RewriteState::RewrittenValid; + Arc::new(Column::new(alias, index)) as _ + }) + }, + ) + }) + .map_or_else( + || Ok(Transformed::no(expr)), + |c| Ok(Transformed::yes(c)), + ) + } + }) + .data(); + + new_expr.map(|e| (state == RewriteState::RewrittenValid).then_some(e)) +} + +/// Downcasts all the expressions in `exprs` to `Column`s. If any of the given +/// expressions is not a `Column`, returns `None`. +pub fn physical_to_column_exprs( + exprs: &[(Arc, String)], +) -> Option> { + exprs + .iter() + .map(|(expr, alias)| { + expr.as_any() + .downcast_ref::() + .map(|col| (col.clone(), alias.clone())) + }) + .collect() +} + +/// If pushing down the projection over this join's children seems possible, +/// this function constructs the new [`ProjectionExec`]s that will come on top +/// of the original children of the join. +pub fn new_join_children( + projection_as_columns: &[(Column, String)], + far_right_left_col_ind: i32, + far_left_right_col_ind: i32, + left_child: &Arc, + right_child: &Arc, +) -> Result<(ProjectionExec, ProjectionExec)> { + let new_left = ProjectionExec::try_new( + projection_as_columns[0..=far_right_left_col_ind as _] + .iter() + .map(|(col, alias)| { + ( + Arc::new(Column::new(col.name(), col.index())) as _, + alias.clone(), + ) + }) + .collect_vec(), + Arc::clone(left_child), + )?; + let left_size = left_child.schema().fields().len() as i32; + let new_right = ProjectionExec::try_new( + projection_as_columns[far_left_right_col_ind as _..] + .iter() + .map(|(col, alias)| { + ( + Arc::new(Column::new( + col.name(), + // Align projected expressions coming from the right + // table with the new right child projection: + (col.index() as i32 - left_size) as _, + )) as _, + alias.clone(), + ) + }) + .collect_vec(), + Arc::clone(right_child), + )?; + + Ok((new_left, new_right)) +} + +/// Checks three conditions for pushing a projection down through a join: +/// - Projection must narrow the join output schema. +/// - Columns coming from left/right tables must be collected at the left/right +/// sides of the output table. +/// - Left or right table is not lost after the projection. +pub fn join_allows_pushdown( + projection_as_columns: &[(Column, String)], + join_schema: &SchemaRef, + far_right_left_col_ind: i32, + far_left_right_col_ind: i32, +) -> bool { + // Projection must narrow the join output: + projection_as_columns.len() < join_schema.fields().len() + // Are the columns from different tables mixed? + && (far_right_left_col_ind + 1 == far_left_right_col_ind) + // Left or right table is not lost after the projection. + && far_right_left_col_ind >= 0 + && far_left_right_col_ind < projection_as_columns.len() as i32 +} + +/// Returns the last index before encountering a column coming from the right table when traveling +/// through the projection from left to right, and the last index before encountering a column +/// coming from the left table when traveling through the projection from right to left. +/// If there is no column in the projection coming from the left side, it returns (-1, ...), +/// if there is no column in the projection coming from the right side, it returns (..., projection length). +pub fn join_table_borders( + left_table_column_count: usize, + projection_as_columns: &[(Column, String)], +) -> (i32, i32) { + let far_right_left_col_ind = projection_as_columns + .iter() + .enumerate() + .take_while(|(_, (projection_column, _))| { + projection_column.index() < left_table_column_count + }) + .last() + .map(|(index, _)| index as i32) + .unwrap_or(-1); + + let far_left_right_col_ind = projection_as_columns + .iter() + .enumerate() + .rev() + .take_while(|(_, (projection_column, _))| { + projection_column.index() >= left_table_column_count + }) + .last() + .map(|(index, _)| index as i32) + .unwrap_or(projection_as_columns.len() as i32); + + (far_right_left_col_ind, far_left_right_col_ind) +} + +/// Tries to update the equi-join `Column`'s of a join as if the input of +/// the join was replaced by a projection. +pub fn update_join_on( + proj_left_exprs: &[(Column, String)], + proj_right_exprs: &[(Column, String)], + hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)], + left_field_size: usize, +) -> Option> { + // TODO: Clippy wants the "map" call removed, but doing so generates + // a compilation error. Remove the clippy directive once this + // issue is fixed. + #[allow(clippy::map_identity)] + let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on + .iter() + .map(|(left, right)| (left, right)) + .unzip(); + + let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0); + let new_right_columns = + new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size); + + match (new_left_columns, new_right_columns) { + (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()), + _ => None, + } +} + +/// Tries to update the column indices of a [`JoinFilter`] as if the input of +/// the join was replaced by a projection. +pub fn update_join_filter( + projection_left_exprs: &[(Column, String)], + projection_right_exprs: &[(Column, String)], + join_filter: &JoinFilter, + left_field_size: usize, +) -> Option { + let mut new_left_indices = new_indices_for_join_filter( + join_filter, + JoinSide::Left, + projection_left_exprs, + 0, + ) + .into_iter(); + let mut new_right_indices = new_indices_for_join_filter( + join_filter, + JoinSide::Right, + projection_right_exprs, + left_field_size, + ) + .into_iter(); + + // Check if all columns match: + (new_right_indices.len() + new_left_indices.len() + == join_filter.column_indices().len()) + .then(|| { + JoinFilter::new( + Arc::clone(join_filter.expression()), + join_filter + .column_indices() + .iter() + .map(|col_idx| ColumnIndex { + index: if col_idx.side == JoinSide::Left { + new_left_indices.next().unwrap() + } else { + new_right_indices.next().unwrap() + }, + side: col_idx.side, + }) + .collect(), + Arc::clone(join_filter.schema()), + ) + }) +} + +/// Unifies `projection` with its input (which is also a [`ProjectionExec`]). +fn try_unifying_projections( + projection: &ProjectionExec, + child: &ProjectionExec, +) -> Result>> { + let mut projected_exprs = vec![]; + let mut column_ref_map: HashMap = HashMap::new(); + + // Collect the column references usage in the outer projection. + projection.expr().iter().for_each(|(expr, _)| { + expr.apply(|expr| { + Ok({ + if let Some(column) = expr.as_any().downcast_ref::() { + *column_ref_map.entry(column.clone()).or_default() += 1; + } + TreeNodeRecursion::Continue + }) + }) + .unwrap(); + }); + // Merging these projections is not beneficial, e.g + // If an expression is not trivial and it is referred more than 1, unifies projections will be + // beneficial as caching mechanism for non-trivial computations. + // See discussion in: https://github.com/apache/datafusion/issues/8296 + if column_ref_map.iter().any(|(column, count)| { + *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].0)) + }) { + return Ok(None); + } + for (expr, alias) in projection.expr() { + // If there is no match in the input projection, we cannot unify these + // projections. This case will arise if the projection expression contains + // a `PhysicalExpr` variant `update_expr` doesn't support. + let Some(expr) = update_expr(expr, child.expr(), true)? else { + return Ok(None); + }; + projected_exprs.push((expr, alias.clone())); + } + ProjectionExec::try_new(projected_exprs, Arc::clone(child.input())) + .map(|e| Some(Arc::new(e) as _)) +} + +/// Collect all column indices from the given projection expressions. +fn collect_column_indices(exprs: &[(Arc, String)]) -> Vec { + // Collect indices and remove duplicates. + let mut indices = exprs + .iter() + .flat_map(|(expr, _)| collect_columns(expr)) + .map(|x| x.index()) + .collect::>() + .into_iter() + .collect::>(); + indices.sort(); + indices +} + +/// This function determines and returns a vector of indices representing the +/// positions of columns in `projection_exprs` that are involved in `join_filter`, +/// and correspond to a particular side (`join_side`) of the join operation. +/// +/// Notes: Column indices in the projection expressions are based on the join schema, +/// whereas the join filter is based on the join child schema. `column_index_offset` +/// represents the offset between them. +fn new_indices_for_join_filter( + join_filter: &JoinFilter, + join_side: JoinSide, + projection_exprs: &[(Column, String)], + column_index_offset: usize, +) -> Vec { + join_filter + .column_indices() + .iter() + .filter(|col_idx| col_idx.side == join_side) + .filter_map(|col_idx| { + projection_exprs + .iter() + .position(|(col, _)| col_idx.index + column_index_offset == col.index()) + }) + .collect() +} + +/// This function generates a new set of columns to be used in a hash join +/// operation based on a set of equi-join conditions (`hash_join_on`) and a +/// list of projection expressions (`projection_exprs`). +/// +/// Notes: Column indices in the projection expressions are based on the join schema, +/// whereas the join on expressions are based on the join child schema. `column_index_offset` +/// represents the offset between them. +fn new_columns_for_join_on( + hash_join_on: &[&PhysicalExprRef], + projection_exprs: &[(Column, String)], + column_index_offset: usize, +) -> Option> { + let new_columns = hash_join_on + .iter() + .filter_map(|on| { + // Rewrite all columns in `on` + Arc::clone(*on) + .transform(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() { + // Find the column in the projection expressions + let new_column = projection_exprs + .iter() + .enumerate() + .find(|(_, (proj_column, _))| { + column.name() == proj_column.name() + && column.index() + column_index_offset + == proj_column.index() + }) + .map(|(index, (_, alias))| Column::new(alias, index)); + if let Some(new_column) = new_column { + Ok(Transformed::yes(Arc::new(new_column))) + } else { + // If the column is not found in the projection expressions, + // it means that the column is not projected. In this case, + // we cannot push the projection down. + internal_err!( + "Column {:?} not found in projection expressions", + column + ) + } + } else { + Ok(Transformed::no(expr)) + } + }) + .data() + .ok() + }) + .collect::>(); + (new_columns.len() == hash_join_on.len()).then_some(new_columns) +} + +/// Checks if the given expression is trivial. +/// An expression is considered trivial if it is either a `Column` or a `Literal`. +fn is_expr_trivial(expr: &Arc) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; + use crate::common::collect; use crate::test; use arrow_schema::DataType; use datafusion_common::ScalarValue; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; + + #[test] + fn test_collect_column_indices() -> Result<()> { + let expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 7)), + Operator::Minus, + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + Operator::Plus, + Arc::new(Column::new("a", 1)), + )), + )); + let column_indices = collect_column_indices(&[(expr, "b-(1+a)".to_string())]); + assert_eq!(column_indices, vec![1, 7]); + Ok(()) + } + + #[test] + fn test_join_table_borders() -> Result<()> { + let projections = vec![ + (Column::new("b", 1), "b".to_owned()), + (Column::new("c", 2), "c".to_owned()), + (Column::new("e", 4), "e".to_owned()), + (Column::new("d", 3), "d".to_owned()), + (Column::new("c", 2), "c".to_owned()), + (Column::new("f", 5), "f".to_owned()), + (Column::new("h", 7), "h".to_owned()), + (Column::new("g", 6), "g".to_owned()), + ]; + let left_table_column_count = 5; + assert_eq!( + join_table_borders(left_table_column_count, &projections), + (4, 5) + ); + + let left_table_column_count = 8; + assert_eq!( + join_table_borders(left_table_column_count, &projections), + (7, 8) + ); + + let left_table_column_count = 1; + assert_eq!( + join_table_borders(left_table_column_count, &projections), + (-1, 0) + ); + + let projections = vec![ + (Column::new("a", 0), "a".to_owned()), + (Column::new("b", 1), "b".to_owned()), + (Column::new("d", 3), "d".to_owned()), + (Column::new("g", 6), "g".to_owned()), + (Column::new("e", 4), "e".to_owned()), + (Column::new("f", 5), "f".to_owned()), + (Column::new("e", 4), "e".to_owned()), + (Column::new("h", 7), "h".to_owned()), + ]; + let left_table_column_count = 5; + assert_eq!( + join_table_borders(left_table_column_count, &projections), + (2, 7) + ); + + let left_table_column_count = 7; + assert_eq!( + join_table_borders(left_table_column_count, &projections), + (6, 7) + ); + + Ok(()) + } + #[tokio::test] async fn project_no_column() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 270aabeb553c..8d180c212eba 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -32,6 +32,7 @@ use super::{ use crate::execution_plan::CardinalityEffect; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; +use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; @@ -44,6 +45,7 @@ use arrow::datatypes::{SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; use arrow_array::{PrimitiveArray, RecordBatchOptions}; use datafusion_common::utils::transpose; +use datafusion_common::HashMap; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; @@ -51,7 +53,6 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_common::HashMap; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::trace; @@ -672,6 +673,46 @@ impl ExecutionPlan for RepartitionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down. + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + // If pushdown is not beneficial or applicable, break it. + if projection.benefits_from_input_partitioning()[0] + || !all_columns(projection.expr()) + { + return Ok(None); + } + + let new_projection = make_with_child(projection, self.input())?; + + let new_partitioning = match self.partitioning() { + Partitioning::Hash(partitions, size) => { + let mut new_partitions = vec![]; + for partition in partitions { + let Some(new_partition) = + update_expr(partition, projection.expr(), false)? + else { + return Ok(None); + }; + new_partitions.push(new_partition); + } + Partitioning::Hash(new_partitions, *size) + } + others => others.clone(), + }; + + Ok(Some(Arc::new(RepartitionExec::try_new( + new_projection, + new_partitioning, + )?))) + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index fd7e426a82c5..caa9a754a0d2 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,6 +31,7 @@ use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; +use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::{ get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, @@ -1024,6 +1025,37 @@ impl ExecutionPlan for SortExec { CardinalityEffect::LowerEqual } } + + /// Tries to swap the projection with its input [`SortExec`]. If it can be done, + /// it returns the new swapped version having the [`SortExec`] as the top plan. + /// Otherwise, it returns None. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down. + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + let mut updated_exprs = LexOrdering::default(); + for sort in self.expr() { + let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? + else { + return Ok(None); + }; + updated_exprs.push(PhysicalSortExpr { + expr: new_expr, + options: sort.options, + }); + } + + Ok(Some(Arc::new( + SortExec::new(updated_exprs, make_with_child(projection, self.input())?) + .with_fetch(self.fetch()) + .with_preserve_partitioning(self.preserve_partitioning()), + ))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index adcb28e538fd..4f4237204fb1 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, @@ -32,8 +33,9 @@ use crate::{ use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; - +use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; + use log::{debug, trace}; /// Sort preserving merge execution plan @@ -334,6 +336,39 @@ impl ExecutionPlan for SortPreservingMergeExec { fn supports_limit_pushdown(&self) -> bool { true } + + /// Tries to swap the projection with its input [`SortPreservingMergeExec`]. + /// If this is possible, it returns the new [`SortPreservingMergeExec`] whose + /// child is a projection. Otherwise, it returns None. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down. + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + let mut updated_exprs = LexOrdering::default(); + for sort in self.expr() { + let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)? + else { + return Ok(None); + }; + updated_exprs.push(PhysicalSortExpr { + expr: updated_expr, + options: sort.options, + }); + } + + Ok(Some(Arc::new( + SortPreservingMergeExec::new( + updated_exprs, + make_with_child(projection, self.input())?, + ) + .with_fetch(self.fetch()), + ))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index da8b0e877dcc..751af9921448 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -24,6 +24,11 @@ use std::sync::Arc; use super::{DisplayAs, DisplayFormatType, PlanProperties}; use crate::display::{display_orderings, ProjectSchemaDisplay}; use crate::execution_plan::{Boundedness, EmissionType}; +use crate::limit::LimitStream; +use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::{ + all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec, +}; use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; @@ -31,10 +36,8 @@ use arrow::datatypes::SchemaRef; use arrow_schema::Schema; use datafusion_common::{internal_err, plan_err, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; -use crate::limit::LimitStream; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use async_trait::async_trait; use futures::stream::StreamExt; use log::debug; @@ -272,6 +275,53 @@ impl ExecutionPlan for StreamingTableExec { }) } + /// Tries to embed `projection` to its input (`streaming table`). + /// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise, + /// returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + if !all_alias_free_columns(projection.expr()) { + return Ok(None); + } + + let streaming_table_projections = + self.projection().as_ref().map(|i| i.as_ref().to_vec()); + let new_projections = new_projections_for_columns( + projection, + &streaming_table_projections + .unwrap_or((0..self.schema().fields().len()).collect()), + ); + + let mut lex_orderings = vec![]; + for lex_ordering in self.projected_output_ordering().into_iter() { + let mut orderings = LexOrdering::default(); + for order in lex_ordering { + let Some(new_ordering) = + update_expr(&order.expr, projection.expr(), false)? + else { + return Ok(None); + }; + orderings.push(PhysicalSortExpr { + expr: new_ordering, + options: order.options, + }); + } + lex_orderings.push(orderings); + } + + StreamingTableExec::try_new( + Arc::clone(self.partition_schema()), + self.partitions().clone(), + Some(new_projections.as_ref()), + lex_orderings, + self.is_infinite(), + self.limit(), + ) + .map(|e| Some(Arc::new(e) as _)) + } + fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index bcd9572f45c7..bacd02398ec0 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -36,6 +36,7 @@ use crate::execution_plan::{ boundedness_from_children, emission_type_from_children, InvariantLevel, }; use crate::metrics::BaselineMetrics; +use crate::projection::{make_with_child, ProjectionExec}; use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; @@ -271,6 +272,27 @@ impl ExecutionPlan for UnionExec { fn supports_limit_pushdown(&self) -> bool { true } + + /// Tries to push `projection` down through `union`. If possible, performs the + /// pushdown and returns a new [`UnionExec`] as the top plan which has projections + /// as its children. Otherwise, returns `None`. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection doesn't narrow the schema, we shouldn't try to push it down. + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + let new_children = self + .children() + .into_iter() + .map(|child| make_with_child(projection, child)) + .collect::>>()?; + + Ok(Some(Arc::new(UnionExec::new(new_children)))) + } } /// Combines multiple input streams by interleaving them. diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 02f66f342b8a..0798c8cc1453 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -840,4 +840,3 @@ DROP TABLE t1; statement ok DROP TABLE t2; - diff --git a/datafusion/sqllogictest/test_files/encoding.slt b/datafusion/sqllogictest/test_files/encoding.slt index be1c5aa40583..960e81f4d14c 100644 --- a/datafusion/sqllogictest/test_files/encoding.slt +++ b/datafusion/sqllogictest/test_files/encoding.slt @@ -125,4 +125,4 @@ NULL NULL NULL R Ug 52 query T select encode(digest('hello', 'sha256'), 'hex'); ---- -2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 \ No newline at end of file +2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index a3592ba159e2..b2aa4b3fac8f 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -2046,4 +2046,4 @@ select 1 where null between null and 2; query T select 'A' where null between 2 and null; ---- +---- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 6933514951b3..8895a2986103 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1385,4 +1385,4 @@ WHERE moving_sum > 60; item1 1970-01-01T00:00:03 75 statement ok -drop table source_table; \ No newline at end of file +drop table source_table; diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part index c60848cb56c1..34d63f67efdf 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part @@ -57,4 +57,4 @@ physical_plan 08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_quantity@0, l_extendedprice@1, l_discount@2, l_tax@3, l_returnflag@4, l_linestatus@5] -11)--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false \ No newline at end of file +11)--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part index 6645ede2a73b..3928684a6824 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part @@ -123,4 +123,3 @@ physical_plan 48)------------------------FilterExec: n_name@1 = GERMANY, projection=[n_nationkey@0] 49)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 50)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false - diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part index e9486c39f08c..93021d371e6f 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part @@ -97,4 +97,3 @@ physical_plan 30)--------------------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0] 31)----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 32)------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], has_header=false - diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part index f28a5ef54885..5fc2973d6fa2 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q17.slt.part @@ -74,4 +74,3 @@ physical_plan 21)------------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 22)--------------------AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)] 23)----------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity], has_header=false - diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part index e3918ba62b02..4d0cb1bc7e1d 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part @@ -124,4 +124,3 @@ physical_plan 41)--------------------------CoalesceBatchesExec: target_batch_size=8192 42)----------------------------FilterExec: l_shipdate@3 >= 1994-01-01 AND l_shipdate@3 < 1995-01-01, projection=[l_partkey@0, l_suppkey@1, l_quantity@2] 43)------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], has_header=false - diff --git a/datafusion/sqllogictest/test_files/tpch/tpch.slt b/datafusion/sqllogictest/test_files/tpch/tpch.slt index 127152daa580..c6d630997e29 100644 --- a/datafusion/sqllogictest/test_files/tpch/tpch.slt +++ b/datafusion/sqllogictest/test_files/tpch/tpch.slt @@ -28,4 +28,3 @@ set datafusion.optimizer.prefer_hash_join = false; include ./answers/q*.slt.part include ./drop_tables.slt.part - diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 889810611cf1..3e387008ebcc 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1767,7 +1767,7 @@ logical_plan 01)Projection: count(*) AS global_count 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] 03)----SubqueryAlias: a -04)------Projection: +04)------Projection: 05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]] 06)----------Projection: aggregate_test_100.c1 07)------------Filter: aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")