diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 682342d27b29..0534b04f5dc3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -38,9 +38,8 @@ use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ - enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan, - find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, - split_conjunction, + enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, + grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction, }; use crate::{ build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute, @@ -3494,11 +3493,10 @@ fn calc_func_dependencies_for_project( .flatten() .collect::>(); - let len = exprlist_len(exprs, input.schema(), Some(find_base_plan(input).schema()))?; Ok(input .schema() .functional_dependencies() - .project_functional_dependencies(&proj_indices, len)) + .project_functional_dependencies(&proj_indices, exprs.len())) } /// Sorts its input according to a list of sort expressions. diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 3404cce17188..9d7def76a11e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -19,7 +19,6 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, HashSet}; -use std::ops::Deref; use std::sync::Arc; use crate::expr::{Alias, Sort, WildcardOptions, WindowFunction, WindowFunctionParams}; @@ -696,165 +695,11 @@ pub fn exprlist_to_fields<'a>( plan: &LogicalPlan, ) -> Result, Arc)>> { // Look for exact match in plan's output schema - let wildcard_schema = find_base_plan(plan).schema(); let input_schema = plan.schema(); - let result = exprs - .into_iter() - .map(|e| match e { - #[expect(deprecated)] - Expr::Wildcard { qualifier, options } => match qualifier { - None => { - let mut excluded = exclude_using_columns(plan)?; - excluded.extend(get_excluded_columns( - options.exclude.as_ref(), - options.except.as_ref(), - wildcard_schema, - None, - )?); - Ok(wildcard_schema - .iter() - .filter(|(q, f)| { - !excluded.contains(&Column::new(q.cloned(), f.name())) - }) - .map(|(q, f)| (q.cloned(), Arc::clone(f))) - .collect::>()) - } - Some(qualifier) => { - let excluded: Vec = get_excluded_columns( - options.exclude.as_ref(), - options.except.as_ref(), - wildcard_schema, - Some(qualifier), - )? - .into_iter() - .map(|c| c.flat_name()) - .collect(); - Ok(wildcard_schema - .fields_with_qualified(qualifier) - .into_iter() - .filter_map(|field| { - let flat_name = format!("{}.{}", qualifier, field.name()); - if excluded.contains(&flat_name) { - None - } else { - Some(( - Some(qualifier.clone()), - Arc::new(field.to_owned()), - )) - } - }) - .collect::>()) - } - }, - _ => Ok(vec![e.to_field(input_schema)?]), - }) - .collect::>>()? - .into_iter() - .flatten() - .collect(); - Ok(result) -} - -/// Find the suitable base plan to expand the wildcard expression recursively. -/// When planning [LogicalPlan::Window] and [LogicalPlan::Aggregate], we will generate -/// an intermediate plan based on the relation plan (e.g. [LogicalPlan::TableScan], [LogicalPlan::Subquery], ...). -/// If we expand a wildcard expression basing the intermediate plan, we could get some duplicate fields. -pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan { - match input { - LogicalPlan::Window(window) => find_base_plan(&window.input), - LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input), - // [SqlToRel::try_process_unnest] will convert Expr(Unnest(Expr)) to Projection/Unnest/Projection - // We should expand the wildcard expression based on the input plan of the inner Projection. - LogicalPlan::Unnest(unnest) => { - if let LogicalPlan::Projection(projection) = unnest.input.deref() { - find_base_plan(&projection.input) - } else { - input - } - } - LogicalPlan::Filter(filter) => { - if filter.having { - // If a filter is used for a having clause, its input plan is an aggregation. - // We should expand the wildcard expression based on the aggregation's input plan. - find_base_plan(&filter.input) - } else { - input - } - } - _ => input, - } -} - -/// Count the number of real fields. We should expand the wildcard expression to get the actual number. -pub fn exprlist_len( - exprs: &[Expr], - schema: &DFSchemaRef, - wildcard_schema: Option<&DFSchemaRef>, -) -> Result { exprs - .iter() - .map(|e| match e { - #[expect(deprecated)] - Expr::Wildcard { - qualifier: None, - options, - } => { - let excluded = get_excluded_columns( - options.exclude.as_ref(), - options.except.as_ref(), - wildcard_schema.unwrap_or(schema), - None, - )? - .into_iter() - .collect::>(); - Ok( - get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded) - .len(), - ) - } - #[expect(deprecated)] - Expr::Wildcard { - qualifier: Some(qualifier), - options, - } => { - let related_wildcard_schema = wildcard_schema.as_ref().map_or_else( - || Ok(Arc::clone(schema)), - |schema| { - // Eliminate the fields coming from other tables. - let qualified_fields = schema - .fields() - .iter() - .enumerate() - .filter_map(|(idx, field)| { - let (maybe_table_ref, _) = schema.qualified_field(idx); - if maybe_table_ref.is_none_or(|q| q == qualifier) { - Some((maybe_table_ref.cloned(), Arc::clone(field))) - } else { - None - } - }) - .collect::>(); - let metadata = schema.metadata().clone(); - DFSchema::new_with_metadata(qualified_fields, metadata) - .map(Arc::new) - }, - )?; - let excluded = get_excluded_columns( - options.exclude.as_ref(), - options.except.as_ref(), - related_wildcard_schema.as_ref(), - Some(qualifier), - )? - .into_iter() - .collect::>(); - Ok( - get_exprs_except_skipped(related_wildcard_schema.as_ref(), excluded) - .len(), - ) - } - _ => Ok(1), - }) - .sum() + .into_iter() + .map(|e| e.to_field(input_schema)) + .collect() } /// Convert an expression into Column expression if it's already provided as input plan.