From 4910a4fd90e042f7e95859a1d1700196d4a4fb65 Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Fri, 20 Oct 2023 13:59:47 +1000 Subject: [PATCH 1/2] Add fields_index for searching column index --- datafusion/common/src/dfschema.rs | 70 ++++++++++++++++++++----- datafusion/core/src/physical_planner.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 4 +- 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b1aee41978c2..8b529086500b 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -18,7 +18,7 @@ //! DFSchema is an extended schema struct that DataFusion uses to provide support for //! fields with optional relation names. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; use std::fmt::{Display, Formatter}; use std::hash::Hash; @@ -42,6 +42,8 @@ pub type DFSchemaRef = Arc; pub struct DFSchema { /// Fields fields: Vec, + // For searching column id by field name + fields_index: BTreeMap>, /// Additional metadata in form of key value pairs metadata: HashMap, /// Stores functional dependencies in the schema. @@ -53,6 +55,7 @@ impl DFSchema { pub fn empty() -> Self { Self { fields: vec![], + fields_index: BTreeMap::new(), metadata: HashMap::new(), functional_dependencies: FunctionalDependencies::empty(), } @@ -104,9 +107,24 @@ impl DFSchema { )); } } + + let mut fields_index: BTreeMap> = BTreeMap::new(); + for (idx, field) in fields.iter().enumerate() { + let key = match field.qualifier { + Some(_) => field.name().to_owned(), + None => Column::from_qualified_name(field.name()).name, + }; + field.qualified_name(); + fields_index + .entry(key) + .and_modify(|vec| vec.push(idx)) + .or_insert(vec![idx]); + } + Ok(Self { fields, metadata, + fields_index, functional_dependencies: FunctionalDependencies::empty(), }) } @@ -161,6 +179,16 @@ impl DFSchema { }; if !duplicated_field { self.fields.push(field.clone()); + + let idx = self.fields.len() - 1; + let key = match field.qualifier { + Some(_) => field.name().to_owned(), + None => Column::from_qualified_name(field.name()).name, + }; + self.fields_index + .entry(key) + .and_modify(|vec| vec.push(idx)) + .or_insert(vec![idx]); } } self.metadata.extend(other_schema.metadata.clone()) @@ -208,32 +236,50 @@ impl DFSchema { qualifier: Option<&TableReference>, name: &str, ) -> Result> { - let mut matches = self - .fields + let key = match qualifier { + Some(_) => name.to_owned(), + None => Column::from_qualified_name(name).name, + }; + let Some(matched_fields_idx) = self.fields_index.get(&key) else { + return Ok(None); + }; + + let Some(qualifier) = qualifier else { + // field to lookup is unqualified, no need to compare qualifier + let mut matches = matched_fields_idx + .iter() + .map(|idx| (*idx, self.field(*idx))) + .filter(|(_, field)| field.name() == name) + .map(|(idx, _)| idx); + return Ok(matches.next()); + }; + + // field to lookup is qualified. + let mut matches = matched_fields_idx .iter() - .enumerate() - .filter(|(_, field)| match (qualifier, &field.qualifier) { + .map(|idx| (*idx, self.field(*idx))) + .filter(|(_, field)| match &field.qualifier { // field to lookup is qualified. // current field is qualified and not shared between relations, compare both // qualifier and name. - (Some(q), Some(field_q)) => { - q.resolved_eq(field_q) && field.name() == name + Some(field_qualifier) => { + qualifier.resolved_eq(field_qualifier) && field.name() == name } // field to lookup is qualified but current field is unqualified. - (Some(qq), None) => { + None => { // the original field may now be aliased with a name that matches the // original qualified name let column = Column::from_qualified_name(field.name()); match column { Column { - relation: Some(r), + relation: Some(column_qualifier), name: column_name, - } => &r == qq && column_name == name, + } => { + column_qualifier.resolved_eq(qualifier) && column_name == name + } _ => false, } } - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => field.name() == name, }) .map(|(idx, _)| idx); Ok(matches.next()) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5a1fdcaee509..0c3c957228e2 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2326,7 +2326,7 @@ mod tests { dict_id: 0, \ dict_is_ordered: false, \ metadata: {} } }\ - ], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ + ], fields_index: {\"a\": [0]}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ ExecutionPlan schema: Schema { fields: [\ Field { \ name: \"b\", \ diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 5231dc869875..88423dd955fc 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -504,12 +504,12 @@ mod tests { assert_eq!( "Optimizer rule 'get table_scan rule' failed\ncaused by\nget table_scan rule\ncaused by\n\ Internal error: Failed due to generate a different schema, \ - original schema: DFSchema { fields: [], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ + original schema: DFSchema { fields: [], fields_index: {}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ new schema: DFSchema { fields: [\ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }], \ - metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }.\ + fields_index: {\"a\": [0], \"b\": [1], \"c\": [2]}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }.\ \nThis was likely caused by a bug in DataFusion's code \ and we would welcome that you file an bug report in our issue tracker", err.strip_backtrace() From 8675ea117a6d5e5126deecda2000d6a4c6d096ef Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Sat, 21 Oct 2023 21:50:56 +1000 Subject: [PATCH 2/2] Draft Column refactor --- datafusion/common/src/column.rs | 127 +++++++++++------- datafusion/common/src/dfschema.rs | 62 ++++----- datafusion/common/src/error.rs | 4 +- .../core/src/datasource/listing/helpers.rs | 3 +- .../core/src/datasource/listing/table.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 6 +- datafusion/core/src/physical_planner.rs | 8 +- datafusion/expr/src/expr_rewriter/mod.rs | 12 +- datafusion/expr/src/expr_rewriter/order_by.rs | 5 +- datafusion/expr/src/expr_schema.rs | 4 +- datafusion/expr/src/logical_plan/builder.rs | 63 +++++---- datafusion/expr/src/utils.rs | 4 +- datafusion/optimizer/src/decorrelate.rs | 20 ++- datafusion/optimizer/src/push_down_filter.rs | 5 +- .../optimizer/src/push_down_projection.rs | 4 +- .../optimizer/src/scalar_subquery_to_join.rs | 6 +- datafusion/optimizer/src/utils.rs | 6 +- datafusion/physical-expr/src/planner.rs | 2 +- datafusion/proto/src/logical_plan/to_proto.rs | 4 +- datafusion/sql/src/expr/identifier.rs | 17 +-- datafusion/sql/src/expr/mod.rs | 12 +- datafusion/sql/src/planner.rs | 20 ++- datafusion/sql/src/select.rs | 7 +- datafusion/sql/src/utils.rs | 4 +- .../substrait/src/logical_plan/consumer.rs | 8 +- 25 files changed, 233 insertions(+), 182 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 2e729c128e73..13d8a9307250 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -27,11 +27,14 @@ use std::sync::Arc; /// A named reference to a qualified field in a schema. #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct Column { - /// relation/table reference. - pub relation: Option, - /// field/column name. - pub name: String, +pub enum Column { + Unqualified { + name: String, + }, + Qualified { + relation: OwnedTableReference, + name: String, + }, } impl Column { @@ -45,26 +48,23 @@ impl Column { relation: Option>, name: impl Into, ) -> Self { - Self { - relation: relation.map(|r| r.into()), - name: name.into(), + match relation { + Some(relation) => Self::Qualified { + relation: relation.into(), + name: name.into(), + }, + None => Self::Unqualified { name: name.into() }, } } /// Convenience method for when there is no qualifier pub fn new_unqualified(name: impl Into) -> Self { - Self { - relation: None, - name: name.into(), - } + Self::Unqualified { name: name.into() } } /// Create Column from unqualified name. pub fn from_name(name: impl Into) -> Self { - Self { - relation: None, - name: name.into(), - } + Self::Unqualified { name: name.into() } } fn from_idents(idents: &mut Vec) -> Option { @@ -95,7 +95,10 @@ impl Column { // identifiers will be treated as an unqualified column name _ => return None, }; - Some(Self { relation, name }) + match relation { + Some(relation) => Some(Self::Qualified { relation, name }), + None => Some(Self::Unqualified { name }), + } } /// Deserialize a fully qualified name string into a column @@ -103,44 +106,70 @@ impl Column { /// Treats the name as a SQL identifier. For example /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case) /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR` - pub fn from_qualified_name(flat_name: impl Into) -> Self { - let flat_name: &str = &flat_name.into(); - Self::from_idents(&mut parse_identifiers_normalized(flat_name, false)) - .unwrap_or_else(|| Self { - relation: None, - name: flat_name.to_owned(), - }) + pub fn from_qualified_name(qualified_name: impl Into) -> Self { + let qualified_name: &str = &qualified_name.into(); + Self::from_idents(&mut parse_identifiers_normalized(qualified_name, false)) + .unwrap_or_else(|| Self::new_unqualified(qualified_name.to_owned())) } /// Deserialize a fully qualified name string into a column preserving column text case - pub fn from_qualified_name_ignore_case(flat_name: impl Into) -> Self { - let flat_name: &str = &flat_name.into(); - Self::from_idents(&mut parse_identifiers_normalized(flat_name, true)) - .unwrap_or_else(|| Self { - relation: None, - name: flat_name.to_owned(), - }) + pub fn from_qualified_name_ignore_case(qualified_name: impl Into) -> Self { + let qualified_name: &str = &qualified_name.into(); + Self::from_idents(&mut parse_identifiers_normalized(qualified_name, true)) + .unwrap_or_else(|| Self::new_unqualified(qualified_name.to_owned())) + } + + pub fn relation(&self) -> Option<&OwnedTableReference> { + match self { + Self::Unqualified { .. } => None, + Self::Qualified { relation, .. } => Some(relation), + } + } + + pub fn unqualified_column(&self) -> Self { + Self::Unqualified { + name: self.unqualified_name(), + } + } + + pub fn with_new_qualifier( + &self, + qualifier: Option>, + ) -> Self { + match self { + Self::Unqualified { name } | Self::Qualified { name, .. } => { + Self::new(qualifier, name) + } + } + } + + /// Unqualified name string + pub fn unqualified_name(&self) -> String { + match self { + Self::Unqualified { name } | Self::Qualified { name, .. } => name.clone(), + } + } + + /// Serialize column into a qualified name string + pub fn qualified_name(&self) -> String { + match self { + Self::Unqualified { name } => name.clone(), + Self::Qualified { relation, name } => format!("{}.{}", relation, name), + } } /// Serialize column into a flat name string pub fn flat_name(&self) -> String { - match &self.relation { - Some(r) => format!("{}.{}", r, self.name), - None => self.name.clone(), - } + self.qualified_name() } /// Serialize column into a quoted flat name string pub fn quoted_flat_name(&self) -> String { - match &self.relation { - Some(r) => { - format!( - "{}.{}", - r.to_quoted_string(), - quote_identifier(self.name.as_str()) - ) + match self { + Self::Unqualified { name } => quote_identifier(name).to_string(), + Self::Qualified { relation, name } => { + format!("{}.{}", relation.to_quoted_string(), quote_identifier(name)) } - None => quote_identifier(&self.name).to_string(), } } @@ -172,12 +201,12 @@ impl Column { schemas: &[&Arc], using_columns: &[HashSet], ) -> Result { - if self.relation.is_some() { + if let Self::Qualified { .. } = self { return Ok(self); } for schema in schemas { - let fields = schema.fields_with_unqualified_name(&self.name); + let fields = schema.fields_with_unqualified_name(&self.unqualified_name()); match fields.len() { 0 => continue, 1 => { @@ -212,7 +241,7 @@ impl Column { } Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { - field: Box::new(Column::new(self.relation.clone(), self.name)), + field: Box::new(self.clone()), valid_fields: schemas .iter() .flat_map(|s| s.fields().iter().map(|f| f.qualified_column())) @@ -261,14 +290,14 @@ impl Column { schemas: &[&[&DFSchema]], using_columns: &[HashSet], ) -> Result { - if self.relation.is_some() { + if let Self::Qualified { .. } = self { return Ok(self); } for schema_level in schemas { let fields = schema_level .iter() - .flat_map(|s| s.fields_with_unqualified_name(&self.name)) + .flat_map(|schema| schema.fields_with_unqualified_name(&self.flat_name())) .collect::>(); match fields.len() { 0 => continue, @@ -301,7 +330,7 @@ impl Column { // If not due to USING columns then due to ambiguous column name return Err(DataFusionError::SchemaError( SchemaError::AmbiguousReference { - field: Column::new_unqualified(self.name), + field: Column::new_unqualified(self.flat_name()), }, )); } diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 8b529086500b..940559486294 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -99,10 +99,7 @@ impl DFSchema { if unqualified_names.contains(name) { return Err(DataFusionError::SchemaError( SchemaError::AmbiguousReference { - field: Column { - relation: Some((*qualifier).clone()), - name: name.to_string(), - }, + field: Column::new(Some((*qualifier).clone()), name.to_string()), }, )); } @@ -112,7 +109,7 @@ impl DFSchema { for (idx, field) in fields.iter().enumerate() { let key = match field.qualifier { Some(_) => field.name().to_owned(), - None => Column::from_qualified_name(field.name()).name, + None => Column::from_qualified_name(field.name()).unqualified_name(), }; field.qualified_name(); fields_index @@ -183,7 +180,7 @@ impl DFSchema { let idx = self.fields.len() - 1; let key = match field.qualifier { Some(_) => field.name().to_owned(), - None => Column::from_qualified_name(field.name()).name, + None => Column::from_qualified_name(field.name()).unqualified_name(), }; self.fields_index .entry(key) @@ -238,7 +235,7 @@ impl DFSchema { ) -> Result> { let key = match qualifier { Some(_) => name.to_owned(), - None => Column::from_qualified_name(name).name, + None => Column::from_qualified_name(name).unqualified_name(), }; let Some(matched_fields_idx) = self.fields_index.get(&key) else { return Ok(None); @@ -271,8 +268,8 @@ impl DFSchema { // original qualified name let column = Column::from_qualified_name(field.name()); match column { - Column { - relation: Some(column_qualifier), + Column::Qualified { + relation: column_qualifier, name: column_name, } => { column_qualifier.resolved_eq(qualifier) && column_name == name @@ -286,14 +283,20 @@ impl DFSchema { } /// Find the index of the column with the given qualifier and name - pub fn index_of_column(&self, col: &Column) -> Result { - self.index_of_column_by_name(col.relation.as_ref(), &col.name)? - .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self)) + pub fn index_of_column(&self, column: &Column) -> Result { + self.index_of_column_by_name(column.relation(), &column.unqualified_name())? + .ok_or_else(|| { + field_not_found( + column.relation().map(|q| (*q).clone()), + &column.unqualified_name(), + self, + ) + }) } /// Check if the column is in the current schema - pub fn is_column_from_schema(&self, col: &Column) -> Result { - self.index_of_column_by_name(col.relation.as_ref(), &col.name) + pub fn is_column_from_schema(&self, column: &Column) -> Result { + self.index_of_column_by_name(column.relation(), &column.unqualified_name()) .map(|idx| idx.is_some()) } @@ -349,8 +352,7 @@ impl DFSchema { } else { Err(DataFusionError::SchemaError( SchemaError::AmbiguousReference { - field: Column { - relation: None, + field: Column::Unqualified { name: name.to_string(), }, }, @@ -375,9 +377,12 @@ impl DFSchema { /// Find the field with the given qualified column pub fn field_from_column(&self, column: &Column) -> Result<&DFField> { - match &column.relation { - Some(r) => self.field_with_qualified_name(r, &column.name), - None => self.field_with_unqualified_name(&column.name), + match column { + Column::Qualified { + relation: qualifier, + name, + } => self.field_with_qualified_name(qualifier, name), + Column::Unqualified { name } => self.field_with_unqualified_name(name), } } @@ -400,9 +405,12 @@ impl DFSchema { /// Find if the field exists with the given qualified column pub fn has_column(&self, column: &Column) -> bool { - match &column.relation { - Some(r) => self.has_column_with_qualified_name(r, &column.name), - None => self.has_column_with_unqualified_name(&column.name), + match column { + Column::Qualified { + relation: qualifier, + name, + } => self.has_column_with_qualified_name(qualifier, name), + Column::Unqualified { name } => self.has_column_with_unqualified_name(name), } } @@ -757,18 +765,12 @@ impl DFField { /// Builds a qualified column based on self pub fn qualified_column(&self) -> Column { - Column { - relation: self.qualifier.clone(), - name: self.field.name().to_string(), - } + Column::new(self.qualifier().cloned(), self.field.name()) } /// Builds an unqualified column based on self pub fn unqualified_column(&self) -> Column { - Column { - relation: None, - name: self.field.name().to_string(), - } + Column::new_unqualified(self.field.name()) } /// Get the optional qualifier diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index adf58e282ed9..51c88b048fa2 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -188,12 +188,12 @@ impl Display for SchemaError { ) } Self::AmbiguousReference { field } => { - if field.relation.is_some() { + if field.relation().is_some() { write!( f, "Schema contains qualified field name {} and unqualified field name {} which would be ambiguous", field.quoted_flat_name(), - quote_identifier(&field.name) + quote_identifier(&field.unqualified_name()) ) } else { write!( diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d6a0add9b253..a23ed8d8b5e1 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -55,7 +55,8 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { let mut is_applicable = true; expr.apply(&mut |expr| { Ok(match expr { - Expr::Column(Column { ref name, .. }) => { + Expr::Column(Column::Unqualified { ref name }) + | Expr::Column(Column::Qualified { ref name, .. }) => { is_applicable &= col_names.contains(name); if is_applicable { VisitRecursion::Skip diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 05d8ba6c451e..a31e4e9cf6c8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -684,7 +684,7 @@ impl ListingTable { .map(|expr| { if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { if let Expr::Column(col) = expr.as_ref() { - let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?; + let expr = physical_plan::expressions::col(&col.unqualified_name(), self.table_schema.as_ref())?; Ok(PhysicalSortExpr { expr, options: SortOptions { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index c6e2c68d0211..62316b4c2c24 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -177,7 +177,7 @@ macro_rules! get_statistic { macro_rules! get_min_max_values { ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ let (_column_index, field) = - if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { + if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.unqualified_name()) { (v, f) } else { // Named column was not present @@ -191,7 +191,7 @@ macro_rules! get_min_max_values { $self.row_group_metadata .columns() .iter() - .find(|c| c.column_descr().name() == &$column.name) + .find(|c| c.column_descr().name() == &$column.unqualified_name()) .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None}) .map(|(stats, column_descr)| { @@ -213,7 +213,7 @@ macro_rules! get_null_count_values { .row_group_metadata .columns() .iter() - .find(|c| c.column_descr().name() == &$column.name) + .find(|c| c.column_descr().name() == &$column.unqualified_name()) { col.statistics().map(|s| s.null_count()) } else { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0c3c957228e2..bc72dc20ad65 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -125,7 +125,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { match e { Expr::Column(c) => { if is_first_expr { - Ok(c.name.clone()) + Ok(c.unqualified_name().clone()) } else { Ok(c.flat_name()) } @@ -1067,8 +1067,8 @@ impl DefaultPhysicalPlanner { let l = l.try_into_col()?; let r = r.try_into_col()?; Ok(( - Column::new(&l.name, left_df_schema.index_of_column(&l)?), - Column::new(&r.name, right_df_schema.index_of_column(&r)?), + Column::new(&l.unqualified_name(), left_df_schema.index_of_column(&l)?), + Column::new(&r.unqualified_name(), right_df_schema.index_of_column(&r)?), )) }) .collect::>()?; @@ -1228,7 +1228,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Unnest(Unnest { input, column, schema, options }) => { let input = self.create_initial_plan(input, session_state).await?; let column_exec = schema.index_of_column(column) - .map(|idx| Column::new(&column.name, idx))?; + .map(|idx| Column::new(&column.unqualified_name(), idx))?; let schema = SchemaRef::new(schema.as_ref().to_owned().into()); Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone()))) } diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 1f04c80833f0..d20ab3fbbac6 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -124,12 +124,8 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul pub fn unnormalize_col(expr: Expr) -> Expr { expr.transform(&|expr| { Ok({ - if let Expr::Column(c) = expr { - let col = Column { - relation: None, - name: c.name, - }; - Transformed::Yes(Expr::Column(col)) + if let Expr::Column(column) = expr { + Transformed::Yes(Expr::Column(column.unqualified_column())) } else { Transformed::No(expr) } @@ -145,9 +141,7 @@ pub fn create_col_from_scalar_expr( ) -> Result { match scalar_expr { Expr::Alias(Alias { name, .. }) => Ok(Column::new(Some(subqry_alias), name)), - Expr::Column(Column { relation: _, name }) => { - Ok(Column::new(Some(subqry_alias), name)) - } + Expr::Column(column) => Ok(column.with_new_qualifier(Some(subqry_alias))), _ => { let scalar_column = scalar_expr.display_name()?; Ok(Column::new(Some(subqry_alias), scalar_column)) diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index c87a724d5646..0643a694e012 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -110,10 +110,7 @@ fn rewrite_in_terms_of_projection( // for a column with the same "MIN(C2)", so translate there let name = normalized_expr.display_name()?; - let search_col = Expr::Column(Column { - relation: None, - name, - }); + let search_col = Expr::Column(Column::new_unqualified(name)); // look for the column named the same as this expr if let Some(found) = proj_exprs.iter().find(|a| expr_match(&search_col, a)) { diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 025b74eb5009..7881dbb652f7 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -289,8 +289,8 @@ impl ExprSchemable for Expr { fn to_field(&self, input_schema: &DFSchema) -> Result { match self { Expr::Column(c) => Ok(DFField::new( - c.relation.clone(), - &c.name, + c.relation().cloned(), + &c.unqualified_name(), self.get_type(input_schema)?, self.nullable(input_schema)?, ) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index cd50dbe79cfd..a187f31d37a3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -744,16 +744,22 @@ impl LogicalPlanBuilder { let l = l.into(); let r = r.into(); - match (&l.relation, &r.relation) { + match (&l.relation(), &r.relation()) { (Some(lr), Some(rr)) => { - let l_is_left = - self.plan.schema().field_with_qualified_name(lr, &l.name); - let l_is_right = - right.schema().field_with_qualified_name(lr, &l.name); - let r_is_left = - self.plan.schema().field_with_qualified_name(rr, &r.name); - let r_is_right = - right.schema().field_with_qualified_name(rr, &r.name); + let l_is_left = self + .plan + .schema() + .field_with_qualified_name(lr, &l.unqualified_name()); + let l_is_right = right + .schema() + .field_with_qualified_name(lr, &l.unqualified_name()); + let r_is_left = self + .plan + .schema() + .field_with_qualified_name(rr, &r.unqualified_name()); + let r_is_right = right + .schema() + .field_with_qualified_name(rr, &r.unqualified_name()); match (l_is_left, l_is_right, r_is_left, r_is_right) { (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)), @@ -765,10 +771,13 @@ impl LogicalPlanBuilder { } } (Some(lr), None) => { - let l_is_left = - self.plan.schema().field_with_qualified_name(lr, &l.name); - let l_is_right = - right.schema().field_with_qualified_name(lr, &l.name); + let l_is_left = self + .plan + .schema() + .field_with_qualified_name(lr, &l.unqualified_name()); + let l_is_right = right + .schema() + .field_with_qualified_name(lr, &l.unqualified_name()); match (l_is_left, l_is_right) { (Ok(_), _) => (Ok(l), Self::normalize(&right, r)), @@ -780,10 +789,13 @@ impl LogicalPlanBuilder { } } (None, Some(rr)) => { - let r_is_left = - self.plan.schema().field_with_qualified_name(rr, &r.name); - let r_is_right = - right.schema().field_with_qualified_name(rr, &r.name); + let r_is_left = self + .plan + .schema() + .field_with_qualified_name(rr, &r.unqualified_name()); + let r_is_right = right + .schema() + .field_with_qualified_name(rr, &r.unqualified_name()); match (r_is_left, r_is_right) { (Ok(_), _) => (Ok(r), Self::normalize(&right, l)), @@ -1242,10 +1254,11 @@ pub fn project_with_column_index( Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { e.unalias().alias(schema.field(i).name()) } - Expr::Column(Column { - relation: _, - ref name, - }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), + Expr::Column(ref column) + if &column.unqualified_name() != schema.field(i).name() => + { + e.alias(schema.field(i).name()) + } Expr::Alias { .. } | Expr::Column { .. } => e, _ => e.alias(schema.field(i).name()), }) @@ -1850,8 +1863,8 @@ mod tests { match plan { Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference { field: - Column { - relation: Some(OwnedTableReference::Bare { table }), + Column::Qualified { + relation: OwnedTableReference::Bare { table }, name, }, })) => { @@ -1877,8 +1890,8 @@ mod tests { match plan { Err(DataFusionError::SchemaError(SchemaError::AmbiguousReference { field: - Column { - relation: Some(OwnedTableReference::Bare { table }), + Column::Qualified { + relation: OwnedTableReference::Bare { table }, name, }, })) => { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 5fc5b5b3f9c7..1e42add6999e 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -388,10 +388,10 @@ pub fn expand_wildcard( let mut out_column_names: HashSet = HashSet::new(); cols.into_iter() .filter_map(|c| { - if out_column_names.contains(&c.name) { + if out_column_names.contains(&c.unqualified_name()) { Some(c) } else { - out_column_names.insert(c.name); + out_column_names.insert(c.unqualified_name()); None } }) diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index b5cf73733896..85ad88bf7005 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -253,8 +253,10 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { ); let mut new_correlated_cols = BTreeSet::new(); for col in local_correlated_cols.iter() { - new_correlated_cols - .insert(Column::new(Some(alias.alias.clone()), col.name.clone())); + new_correlated_cols.insert(Column::new( + Some(alias.alias.clone()), + col.unqualified_name().clone(), + )); } self.correlated_subquery_cols_map .insert(plan.clone(), new_correlated_cols); @@ -409,8 +411,10 @@ fn proj_exprs_evaluation_result_on_empty_batch( ) -> Result<()> { for expr in proj_expr.iter() { let result_expr = expr.clone().transform_up(&|expr| { - if let Expr::Column(Column { name, .. }) = &expr { - if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { + if let Expr::Column(column) = &expr { + if let Some(result_expr) = + input_expr_result_map_for_count_bug.get(&column.unqualified_name()) + { Ok(Transformed::Yes(result_expr.clone())) } else { Ok(Transformed::No(expr)) @@ -426,7 +430,7 @@ fn proj_exprs_evaluation_result_on_empty_batch( let result_expr = simplifier.simplify(result_expr)?; let expr_name = match expr { Expr::Alias(Alias { name, .. }) => name.to_string(), - Expr::Column(Column { relation: _, name }) => name.to_string(), + Expr::Column(column) => column.unqualified_name().to_string(), _ => expr.display_name()?, }; expr_result_map_for_count_bug.insert(expr_name, result_expr); @@ -442,8 +446,10 @@ fn filter_exprs_evaluation_result_on_empty_batch( expr_result_map_for_count_bug: &mut ExprResultMap, ) -> Result> { let result_expr = filter_expr.clone().transform_up(&|expr| { - if let Expr::Column(Column { name, .. }) = &expr { - if let Some(result_expr) = input_expr_result_map_for_count_bug.get(name) { + if let Expr::Column(column) = &expr { + if let Some(result_expr) = + input_expr_result_map_for_count_bug.get(&column.unqualified_name()) + { Ok(Transformed::Yes(result_expr.clone())) } else { Ok(Transformed::No(expr)) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4c5cd3ab2855..a17c7ca6260a 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -819,7 +819,10 @@ impl OptimizerRule for PushDownFilter { let mut push_predicates = vec![]; for expr in predicates { let cols = expr.to_columns()?; - if cols.iter().any(|c| prevent_cols.contains(&c.name)) { + if cols + .iter() + .any(|c| prevent_cols.contains(&c.unqualified_name())) + { keep_predicates.push(expr); } else { push_predicates.push(expr); diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 839f6b5bb8f6..275b0869b247 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -570,9 +570,9 @@ fn push_down_scan( let mut projection: BTreeSet = used_columns .iter() .filter(|c| { - c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name + c.relation().is_none() || c.relation().as_ref().unwrap() == &&scan.table_name }) - .map(|c| schema.index_of(&c.name)) + .map(|c| schema.index_of(&c.unqualified_name())) .filter_map(ArrowResult::ok) .collect(); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 7ac0c25119c3..e81228fd155d 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -89,7 +89,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { rewrite_expr.clone().transform_up(&|expr| { if let Expr::Column(col) = &expr { if let Some(map_expr) = - expr_check_map.get(&col.name) + expr_check_map.get(&col.unqualified_name()) { Ok(Transformed::Yes(map_expr.clone())) } else { @@ -143,8 +143,8 @@ impl OptimizerRule for ScalarSubqueryToJoin { let new_expr = rewrite_expr.clone().transform_up(&|expr| { if let Expr::Column(col) = &expr { - if let Some(map_expr) = - expr_check_map.get(&col.name) + if let Some(map_expr) = expr_check_map + .get(&col.unqualified_name()) { Ok(Transformed::Yes(map_expr.clone())) } else { diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index a3e7e42875d7..21e0a3f84547 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -316,7 +316,11 @@ pub(crate) fn replace_qualified_name( let alias_cols: Vec = cols .iter() .map(|col| { - Column::from_qualified_name(format!("{}.{}", subquery_alias, col.name)) + Column::from_qualified_name(format!( + "{}.{}", + subquery_alias, + col.unqualified_name() + )) }) .collect(); let replace_map: HashMap<&Column, &Column> = diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 9a74c2ca64d1..ffd802f14fa5 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -68,7 +68,7 @@ pub fn create_physical_expr( )?), Expr::Column(c) => { let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) + Ok(Arc::new(Column::new(&c.unqualified_name(), idx))) } Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), Expr::ScalarVariable(_, variable_names) => { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 125ced032e20..1312bd074e2e 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -231,10 +231,10 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { impl From for protobuf::Column { fn from(c: Column) -> Self { Self { - relation: c.relation.map(|relation| protobuf::ColumnRelation { + relation: c.relation().map(|relation| protobuf::ColumnRelation { relation: relation.to_string(), }), - name: c.name, + name: c.unqualified_name(), } } } diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 9f53ff579e7c..6174ddefbf0d 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -49,10 +49,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match schema.field_with_unqualified_name(normalize_ident.as_str()) { Ok(_) => { // found a match without a qualified name, this is a inner table column - Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })) + Ok(Expr::Column(Column::new_unqualified(normalize_ident))) } Err(_) => { // check the outer_query_schema and try to find a match @@ -66,16 +63,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { field.qualified_column(), )) } - Err(_) => Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })), + Err(_) => { + Ok(Expr::Column(Column::new_unqualified(normalize_ident))) + } } } else { - Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })) + Ok(Expr::Column(Column::new_unqualified(normalize_ident))) } } } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 26184834874a..d93d0f093789 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -128,22 +128,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Rewrite aliases which are not-complete (e.g. ones that only include only table qualifier in a schema.table qualified relation) fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr { match expr { - Expr::Column(col) => match &col.relation { + Expr::Column(col) => match &col.relation() { Some(q) => { match schema .fields() .iter() .find(|field| match field.qualifier() { Some(field_q) => { - field.name() == &col.name + field.name() == &col.unqualified_name() && field_q.to_string().ends_with(&format!(".{q}")) } _ => false, }) { - Some(df_field) => Expr::Column(Column { - relation: df_field.qualifier().cloned(), - name: df_field.name().clone(), - }), + Some(df_field) => Expr::Column(Column::new( + df_field.qualifier().cloned(), + df_field.name().clone(), + )), None => Expr::Column(col), } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index f7d8307d33a0..1116ca5d1e50 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -275,21 +275,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { find_column_exprs(exprs) .iter() .try_for_each(|col| match col { - Expr::Column(col) => match &col.relation { + Expr::Column(col) => match &col.relation() { Some(r) => { - schema.field_with_qualified_name(r, &col.name)?; + schema.field_with_qualified_name(r, &col.unqualified_name())?; Ok(()) } None => { - if !schema.fields_with_unqualified_name(&col.name).is_empty() { + if !schema + .fields_with_unqualified_name(&col.unqualified_name()) + .is_empty() + { Ok(()) } else { - Err(unqualified_field_not_found(col.name.as_str(), schema)) + Err(unqualified_field_not_found( + col.unqualified_name().as_str(), + schema, + )) } } } .map_err(|_: DataFusionError| { - field_not_found(col.relation.clone(), col.name.as_str(), schema) + field_not_found( + col.relation().cloned(), + col.unqualified_name().as_str(), + schema, + ) }), _ => internal_err!("Not a column"), }) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 2062afabfc1a..a16b78d7bfa2 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -24,7 +24,6 @@ use crate::utils::{ resolve_columns, resolve_positions_to_exprs, }; -use datafusion_common::Column; use datafusion_common::{ get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef, DataFusionError, Result, @@ -456,11 +455,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { replace: ReplaceSelectItem, ) -> Result> { for expr in exprs.iter_mut() { - if let Expr::Column(Column { name, .. }) = expr { + if let Expr::Column(column) = expr { if let Some(item) = replace .items .iter() - .find(|item| item.column_name.value == *name) + .find(|item| item.column_name.value == column.unqualified_name()) { let new_expr = self.sql_select_to_rex( SelectItem::UnnamedExpr(item.expr.clone()), @@ -471,7 +470,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .clone(); *expr = Expr::Alias(Alias { expr: Box::new(new_expr), - name: name.clone(), + name: column.unqualified_name().clone(), }); } } diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 28eaf241fa6f..e0eff7c7d62b 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -169,8 +169,8 @@ pub(crate) fn resolve_aliases_to_exprs( aliases: &HashMap, ) -> Result { expr.clone().transform_up(&|nested_expr| match nested_expr { - Expr::Column(c) if c.relation.is_none() => { - if let Some(aliased_expr) = aliases.get(&c.name) { + Expr::Column(c) if c.relation().is_none() => { + if let Some(aliased_expr) = aliases.get(&c.unqualified_name()) { Ok(Transformed::Yes(aliased_expr.clone())) } else { Ok(Transformed::No(Expr::Column(c))) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index ae65a2c7d94a..92430104a240 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -725,10 +725,10 @@ pub async fn from_substrait_rex( None => { let column = input_schema.field(x.field as usize).qualified_column(); - Ok(Arc::new(Expr::Column(Column { - relation: column.relation, - name: column.name, - }))) + Ok(Arc::new(Expr::Column(Column::new( + column.relation().cloned(), + column.unqualified_name(), + )))) } }, _ => not_impl_err!(