diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 91e9b6789fda..e4f57b0d9798 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -30,7 +30,6 @@ use datafusion_catalog::Session; use datafusion_common::config::ConfigOptions; use datafusion_common::Column; use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown}; -use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule; use datafusion_optimizer::analyzer::type_coercion::TypeCoercion; use datafusion_optimizer::Analyzer; @@ -68,11 +67,11 @@ impl ViewTable { fn apply_required_rule(logical_plan: LogicalPlan) -> Result { let options = ConfigOptions::default(); - Analyzer::with_rules(vec![ - Arc::new(ExpandWildcardRule::new()), - Arc::new(TypeCoercion::new()), - ]) - .execute_and_check(logical_plan, &options, |_, _| {}) + Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]).execute_and_check( + logical_plan, + &options, + |_, _| {}, + ) } /// Get definition ref diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs deleted file mode 100644 index 8015ebfc7534..000000000000 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ /dev/null @@ -1,333 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use crate::AnalyzerRule; -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TransformedResult}; -use datafusion_common::{Column, Result}; -use datafusion_expr::builder::validate_unique_names; -use datafusion_expr::expr::PlannedReplaceSelectItem; -use datafusion_expr::utils::{ - expand_qualified_wildcard, expand_wildcard, find_base_plan, -}; -use datafusion_expr::{ - Distinct, DistinctOn, Expr, LogicalPlan, Projection, SubqueryAlias, -}; - -#[derive(Default, Debug)] -pub struct ExpandWildcardRule {} - -impl ExpandWildcardRule { - pub fn new() -> Self { - Self {} - } -} - -impl AnalyzerRule for ExpandWildcardRule { - fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - // Because the wildcard expansion is based on the schema of the input plan, - // using `transform_up_with_subqueries` here. - plan.transform_up_with_subqueries(expand_internal).data() - } - - fn name(&self) -> &str { - "expand_wildcard_rule" - } -} - -fn expand_internal(plan: LogicalPlan) -> Result> { - match plan { - LogicalPlan::Projection(Projection { expr, input, .. }) => { - let projected_expr = expand_exprlist(&input, expr)?; - validate_unique_names("Projections", projected_expr.iter())?; - Ok(Transformed::yes( - Projection::try_new(projected_expr, Arc::clone(&input)) - .map(LogicalPlan::Projection)?, - )) - } - // The schema of the plan should also be updated if the child plan is transformed. - LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { - Ok(Transformed::yes( - SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias)?, - )) - } - LogicalPlan::Distinct(Distinct::On(distinct_on)) => { - let projected_expr = - expand_exprlist(&distinct_on.input, distinct_on.select_expr)?; - validate_unique_names("Distinct", projected_expr.iter())?; - Ok(Transformed::yes(LogicalPlan::Distinct(Distinct::On( - DistinctOn::try_new( - distinct_on.on_expr, - projected_expr, - distinct_on.sort_expr, - distinct_on.input, - )?, - )))) - } - _ => Ok(Transformed::no(plan)), - } -} - -fn expand_exprlist(input: &LogicalPlan, expr: Vec) -> Result> { - let mut projected_expr = vec![]; - let input = find_base_plan(input); - for e in expr { - match e { - #[expect(deprecated)] - Expr::Wildcard { qualifier, options } => { - if let Some(qualifier) = qualifier { - let expanded = expand_qualified_wildcard( - &qualifier, - input.schema(), - Some(&options), - )?; - // If there is a REPLACE statement, replace that column with the given - // replace expression. Column name remains the same. - let replaced = if let Some(replace) = options.replace { - replace_columns(expanded, &replace)? - } else { - expanded - }; - projected_expr.extend(replaced); - } else { - let expanded = - expand_wildcard(input.schema(), input, Some(&options))?; - // If there is a REPLACE statement, replace that column with the given - // replace expression. Column name remains the same. - let replaced = if let Some(replace) = options.replace { - replace_columns(expanded, &replace)? - } else { - expanded - }; - projected_expr.extend(replaced); - } - } - // A workaround to handle the case when the column name is "*". - // We transform the expression to a Expr::Column through [Column::from_name] in many places. - // It would also convert the wildcard expression to a column expression with name "*". - Expr::Column(Column { - ref relation, - ref name, - // TODO Should we use these spans? - spans: _, - }) => { - if name.eq("*") { - if let Some(qualifier) = relation { - projected_expr.extend(expand_qualified_wildcard( - qualifier, - input.schema(), - None, - )?); - } else { - projected_expr.extend(expand_wildcard( - input.schema(), - input, - None, - )?); - } - } else { - projected_expr.push(e.clone()); - } - } - _ => projected_expr.push(e), - } - } - Ok(projected_expr) -} - -/// If there is a REPLACE statement in the projected expression in the form of -/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces -/// that column with the given replace expression. Column name remains the same. -/// Multiple REPLACEs are also possible with comma separations. -fn replace_columns( - mut exprs: Vec, - replace: &PlannedReplaceSelectItem, -) -> Result> { - for expr in exprs.iter_mut() { - if let Expr::Column(Column { name, .. }) = expr { - if let Some((_, new_expr)) = replace - .items() - .iter() - .zip(replace.expressions().iter()) - .find(|(item, _)| item.column_name.value == *name) - { - *expr = new_expr.clone().alias(name.clone()) - } - } - } - Ok(exprs) -} - -#[cfg(test)] -mod tests { - use arrow::datatypes::{DataType, Field, Schema}; - - use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan}; - use crate::Analyzer; - use datafusion_common::{JoinType, TableReference}; - use datafusion_expr::{ - col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder, - }; - - use super::*; - - fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> { - assert_analyzed_plan_eq_display_indent( - Arc::new(ExpandWildcardRule::new()), - plan, - expected, - ) - } - - #[test] - fn test_expand_wildcard() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![wildcard()])? - .build()?; - let expected = - "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_expand_qualified_wildcard() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![qualified_wildcard(TableReference::bare("test"))])? - .build()?; - let expected = - "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_expand_qualified_wildcard_in_subquery() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![qualified_wildcard(TableReference::bare("test"))])? - .build()?; - let plan = LogicalPlanBuilder::from(plan) - .project(vec![wildcard()])? - .build()?; - let expected = - "Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_expand_wildcard_in_subquery() -> Result<()> { - let projection_a = LogicalPlanBuilder::from(test_table_scan()?) - .project(vec![col("a")])? - .build()?; - let subquery = LogicalPlanBuilder::from(projection_a) - .project(vec![wildcard()])? - .build()?; - let plan = LogicalPlanBuilder::from(test_table_scan()?) - .filter(in_subquery(col("a"), Arc::new(subquery)))? - .project(vec![wildcard()])? - .build()?; - let expected = "\ - Projection: test.a, test.b, test.c [a:UInt32, b:UInt32, c:UInt32]\ - \n Filter: test.a IN () [a:UInt32, b:UInt32, c:UInt32]\ - \n Subquery: [a:UInt32]\ - \n Projection: test.a [a:UInt32]\ - \n Projection: test.a [a:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_expand_wildcard_in_distinct_on() -> Result<()> { - let table_scan = test_table_scan()?; - let plan = LogicalPlanBuilder::from(table_scan) - .distinct_on(vec![col("a")], vec![wildcard()], None)? - .build()?; - let expected = "\ - DistinctOn: on_expr=[[test.a]], select_expr=[[test.a, test.b, test.c]], sort_expr=[[]] [a:UInt32, b:UInt32, c:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_plan_eq(plan, expected) - } - - #[test] - fn test_subquery_schema() -> Result<()> { - let analyzer = Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]); - let options = ConfigOptions::default(); - let subquery = LogicalPlanBuilder::from(test_table_scan()?) - .project(vec![wildcard()])? - .build()?; - let plan = LogicalPlanBuilder::from(subquery) - .alias("sub")? - .project(vec![wildcard()])? - .build()?; - let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| {})?; - for x in analyzed_plan.inputs() { - for field in x.schema().fields() { - assert_ne!(field.name(), "*"); - } - } - Ok(()) - } - - fn employee_schema() -> Schema { - Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("first_name", DataType::Utf8, false), - Field::new("last_name", DataType::Utf8, false), - Field::new("state", DataType::Utf8, false), - Field::new("salary", DataType::Int32, false), - ]) - } - - #[test] - fn plan_using_join_wildcard_projection() -> Result<()> { - let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?; - - let plan = table_scan(Some("t1"), &employee_schema(), None)? - .join_using(t2, JoinType::Inner, vec!["id"])? - .project(vec![wildcard()])? - .build()?; - - let expected = "Projection: *\ - \n Inner Join: Using t1.id = t2.id\ - \n TableScan: t1\ - \n TableScan: t2"; - - assert_eq!(expected, format!("{plan}")); - - let analyzer = Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]); - let options = ConfigOptions::default(); - - let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| {})?; - - // id column should only show up once in projection - let expected = "Projection: t1.id, t1.first_name, t1.last_name, t1.state, t1.salary, t2.first_name, t2.last_name, t2.state, t2.salary\ - \n Inner Join: Using t1.id = t2.id\ - \n TableScan: t1\ - \n TableScan: t2"; - assert_eq!(expected, format!("{analyzed_plan}")); - - Ok(()) - } -} diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index 95781b395f3c..350e65e1e329 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -23,7 +23,8 @@ use crate::analyzer::AnalyzerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; -use datafusion_expr::{logical_plan::LogicalPlan, wildcard, Expr, LogicalPlanBuilder}; +use datafusion_expr::utils::expand_wildcard; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder}; /// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`] /// (DataFrame / ViewTable) @@ -92,7 +93,8 @@ fn generate_projection_expr( ))); } } else { - exprs.push(wildcard()); + let expanded = expand_wildcard(sub_plan.schema(), sub_plan, None)?; + exprs.extend(expanded); } Ok(exprs) } @@ -181,7 +183,7 @@ mod tests { let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?; let expected = "Filter: x.a = Int32(1)\ \n SubqueryAlias: x\ - \n Projection: *\ + \n Projection: y.a, y.b\ \n TableScan: y"; assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected) diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index c506616d142e..1d199f2faafc 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -28,7 +28,6 @@ use datafusion_common::Result; use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::{InvariantLevel, LogicalPlan}; -use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::inline_table_scan::InlineTableScan; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; use crate::analyzer::type_coercion::TypeCoercion; @@ -36,7 +35,6 @@ use crate::utils::log_plan; use self::function_rewrite::ApplyFunctionRewrites; -pub mod expand_wildcard_rule; pub mod function_rewrite; pub mod inline_table_scan; pub mod resolve_grouping_function; @@ -99,9 +97,6 @@ impl Analyzer { pub fn new() -> Self { let rules: Vec> = vec![ Arc::new(InlineTableScan::new()), - // Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule]. - Arc::new(ExpandWildcardRule::new()), - // [Expr::Wildcard] should be expanded before [TypeCoercion] Arc::new(ResolveGroupingFunction::new()), Arc::new(TypeCoercion::new()), ]; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index cab7308f6ff8..1d63d02bb941 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -175,7 +175,6 @@ initial_logical_plan 01)Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c 02)--TableScan: simple_explain_test logical_plan after inline_table_scan SAME TEXT AS ABOVE -logical_plan after expand_wildcard_rule SAME TEXT AS ABOVE logical_plan after resolve_grouping_function SAME TEXT AS ABOVE logical_plan after type_coercion SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index cc7efed419c2..3021d4e38f5b 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion::config::ConfigOptions; -use datafusion::optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule; -use datafusion::optimizer::AnalyzerRule; use std::sync::Arc; use substrait::proto::expression_reference::ExprType; @@ -435,14 +432,10 @@ pub fn to_substrait_plan(plan: &LogicalPlan, state: &SessionState) -> Result