Skip to content

Commit 8675ea1

Browse files
author
Evgeny Maruschenko
committed
Draft Column refactor
1 parent 4910a4f commit 8675ea1

File tree

25 files changed

+233
-182
lines changed

25 files changed

+233
-182
lines changed

datafusion/common/src/column.rs

Lines changed: 78 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ use std::sync::Arc;
2727

2828
/// A named reference to a qualified field in a schema.
2929
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
30-
pub struct Column {
31-
/// relation/table reference.
32-
pub relation: Option<OwnedTableReference>,
33-
/// field/column name.
34-
pub name: String,
30+
pub enum Column {
31+
Unqualified {
32+
name: String,
33+
},
34+
Qualified {
35+
relation: OwnedTableReference,
36+
name: String,
37+
},
3538
}
3639

3740
impl Column {
@@ -45,26 +48,23 @@ impl Column {
4548
relation: Option<impl Into<OwnedTableReference>>,
4649
name: impl Into<String>,
4750
) -> Self {
48-
Self {
49-
relation: relation.map(|r| r.into()),
50-
name: name.into(),
51+
match relation {
52+
Some(relation) => Self::Qualified {
53+
relation: relation.into(),
54+
name: name.into(),
55+
},
56+
None => Self::Unqualified { name: name.into() },
5157
}
5258
}
5359

5460
/// Convenience method for when there is no qualifier
5561
pub fn new_unqualified(name: impl Into<String>) -> Self {
56-
Self {
57-
relation: None,
58-
name: name.into(),
59-
}
62+
Self::Unqualified { name: name.into() }
6063
}
6164

6265
/// Create Column from unqualified name.
6366
pub fn from_name(name: impl Into<String>) -> Self {
64-
Self {
65-
relation: None,
66-
name: name.into(),
67-
}
67+
Self::Unqualified { name: name.into() }
6868
}
6969

7070
fn from_idents(idents: &mut Vec<String>) -> Option<Self> {
@@ -95,52 +95,81 @@ impl Column {
9595
// identifiers will be treated as an unqualified column name
9696
_ => return None,
9797
};
98-
Some(Self { relation, name })
98+
match relation {
99+
Some(relation) => Some(Self::Qualified { relation, name }),
100+
None => Some(Self::Unqualified { name }),
101+
}
99102
}
100103

101104
/// Deserialize a fully qualified name string into a column
102105
///
103106
/// Treats the name as a SQL identifier. For example
104107
/// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
105108
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
106-
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
107-
let flat_name: &str = &flat_name.into();
108-
Self::from_idents(&mut parse_identifiers_normalized(flat_name, false))
109-
.unwrap_or_else(|| Self {
110-
relation: None,
111-
name: flat_name.to_owned(),
112-
})
109+
pub fn from_qualified_name(qualified_name: impl Into<String>) -> Self {
110+
let qualified_name: &str = &qualified_name.into();
111+
Self::from_idents(&mut parse_identifiers_normalized(qualified_name, false))
112+
.unwrap_or_else(|| Self::new_unqualified(qualified_name.to_owned()))
113113
}
114114

115115
/// Deserialize a fully qualified name string into a column preserving column text case
116-
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
117-
let flat_name: &str = &flat_name.into();
118-
Self::from_idents(&mut parse_identifiers_normalized(flat_name, true))
119-
.unwrap_or_else(|| Self {
120-
relation: None,
121-
name: flat_name.to_owned(),
122-
})
116+
pub fn from_qualified_name_ignore_case(qualified_name: impl Into<String>) -> Self {
117+
let qualified_name: &str = &qualified_name.into();
118+
Self::from_idents(&mut parse_identifiers_normalized(qualified_name, true))
119+
.unwrap_or_else(|| Self::new_unqualified(qualified_name.to_owned()))
120+
}
121+
122+
pub fn relation(&self) -> Option<&OwnedTableReference> {
123+
match self {
124+
Self::Unqualified { .. } => None,
125+
Self::Qualified { relation, .. } => Some(relation),
126+
}
127+
}
128+
129+
pub fn unqualified_column(&self) -> Self {
130+
Self::Unqualified {
131+
name: self.unqualified_name(),
132+
}
133+
}
134+
135+
pub fn with_new_qualifier(
136+
&self,
137+
qualifier: Option<impl Into<OwnedTableReference>>,
138+
) -> Self {
139+
match self {
140+
Self::Unqualified { name } | Self::Qualified { name, .. } => {
141+
Self::new(qualifier, name)
142+
}
143+
}
144+
}
145+
146+
/// Unqualified name string
147+
pub fn unqualified_name(&self) -> String {
148+
match self {
149+
Self::Unqualified { name } | Self::Qualified { name, .. } => name.clone(),
150+
}
151+
}
152+
153+
/// Serialize column into a qualified name string
154+
pub fn qualified_name(&self) -> String {
155+
match self {
156+
Self::Unqualified { name } => name.clone(),
157+
Self::Qualified { relation, name } => format!("{}.{}", relation, name),
158+
}
123159
}
124160

125161
/// Serialize column into a flat name string
126162
pub fn flat_name(&self) -> String {
127-
match &self.relation {
128-
Some(r) => format!("{}.{}", r, self.name),
129-
None => self.name.clone(),
130-
}
163+
self.qualified_name()
131164
}
132165

133166
/// Serialize column into a quoted flat name string
134167
pub fn quoted_flat_name(&self) -> String {
135-
match &self.relation {
136-
Some(r) => {
137-
format!(
138-
"{}.{}",
139-
r.to_quoted_string(),
140-
quote_identifier(self.name.as_str())
141-
)
168+
match self {
169+
Self::Unqualified { name } => quote_identifier(name).to_string(),
170+
Self::Qualified { relation, name } => {
171+
format!("{}.{}", relation.to_quoted_string(), quote_identifier(name))
142172
}
143-
None => quote_identifier(&self.name).to_string(),
144173
}
145174
}
146175

@@ -172,12 +201,12 @@ impl Column {
172201
schemas: &[&Arc<DFSchema>],
173202
using_columns: &[HashSet<Column>],
174203
) -> Result<Self> {
175-
if self.relation.is_some() {
204+
if let Self::Qualified { .. } = self {
176205
return Ok(self);
177206
}
178207

179208
for schema in schemas {
180-
let fields = schema.fields_with_unqualified_name(&self.name);
209+
let fields = schema.fields_with_unqualified_name(&self.unqualified_name());
181210
match fields.len() {
182211
0 => continue,
183212
1 => {
@@ -212,7 +241,7 @@ impl Column {
212241
}
213242

214243
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
215-
field: Box::new(Column::new(self.relation.clone(), self.name)),
244+
field: Box::new(self.clone()),
216245
valid_fields: schemas
217246
.iter()
218247
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
@@ -261,14 +290,14 @@ impl Column {
261290
schemas: &[&[&DFSchema]],
262291
using_columns: &[HashSet<Column>],
263292
) -> Result<Self> {
264-
if self.relation.is_some() {
293+
if let Self::Qualified { .. } = self {
265294
return Ok(self);
266295
}
267296

268297
for schema_level in schemas {
269298
let fields = schema_level
270299
.iter()
271-
.flat_map(|s| s.fields_with_unqualified_name(&self.name))
300+
.flat_map(|schema| schema.fields_with_unqualified_name(&self.flat_name()))
272301
.collect::<Vec<_>>();
273302
match fields.len() {
274303
0 => continue,
@@ -301,7 +330,7 @@ impl Column {
301330
// If not due to USING columns then due to ambiguous column name
302331
return Err(DataFusionError::SchemaError(
303332
SchemaError::AmbiguousReference {
304-
field: Column::new_unqualified(self.name),
333+
field: Column::new_unqualified(self.flat_name()),
305334
},
306335
));
307336
}

datafusion/common/src/dfschema.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,7 @@ impl DFSchema {
9999
if unqualified_names.contains(name) {
100100
return Err(DataFusionError::SchemaError(
101101
SchemaError::AmbiguousReference {
102-
field: Column {
103-
relation: Some((*qualifier).clone()),
104-
name: name.to_string(),
105-
},
102+
field: Column::new(Some((*qualifier).clone()), name.to_string()),
106103
},
107104
));
108105
}
@@ -112,7 +109,7 @@ impl DFSchema {
112109
for (idx, field) in fields.iter().enumerate() {
113110
let key = match field.qualifier {
114111
Some(_) => field.name().to_owned(),
115-
None => Column::from_qualified_name(field.name()).name,
112+
None => Column::from_qualified_name(field.name()).unqualified_name(),
116113
};
117114
field.qualified_name();
118115
fields_index
@@ -183,7 +180,7 @@ impl DFSchema {
183180
let idx = self.fields.len() - 1;
184181
let key = match field.qualifier {
185182
Some(_) => field.name().to_owned(),
186-
None => Column::from_qualified_name(field.name()).name,
183+
None => Column::from_qualified_name(field.name()).unqualified_name(),
187184
};
188185
self.fields_index
189186
.entry(key)
@@ -238,7 +235,7 @@ impl DFSchema {
238235
) -> Result<Option<usize>> {
239236
let key = match qualifier {
240237
Some(_) => name.to_owned(),
241-
None => Column::from_qualified_name(name).name,
238+
None => Column::from_qualified_name(name).unqualified_name(),
242239
};
243240
let Some(matched_fields_idx) = self.fields_index.get(&key) else {
244241
return Ok(None);
@@ -271,8 +268,8 @@ impl DFSchema {
271268
// original qualified name
272269
let column = Column::from_qualified_name(field.name());
273270
match column {
274-
Column {
275-
relation: Some(column_qualifier),
271+
Column::Qualified {
272+
relation: column_qualifier,
276273
name: column_name,
277274
} => {
278275
column_qualifier.resolved_eq(qualifier) && column_name == name
@@ -286,14 +283,20 @@ impl DFSchema {
286283
}
287284

288285
/// Find the index of the column with the given qualifier and name
289-
pub fn index_of_column(&self, col: &Column) -> Result<usize> {
290-
self.index_of_column_by_name(col.relation.as_ref(), &col.name)?
291-
.ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
286+
pub fn index_of_column(&self, column: &Column) -> Result<usize> {
287+
self.index_of_column_by_name(column.relation(), &column.unqualified_name())?
288+
.ok_or_else(|| {
289+
field_not_found(
290+
column.relation().map(|q| (*q).clone()),
291+
&column.unqualified_name(),
292+
self,
293+
)
294+
})
292295
}
293296

294297
/// Check if the column is in the current schema
295-
pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
296-
self.index_of_column_by_name(col.relation.as_ref(), &col.name)
298+
pub fn is_column_from_schema(&self, column: &Column) -> Result<bool> {
299+
self.index_of_column_by_name(column.relation(), &column.unqualified_name())
297300
.map(|idx| idx.is_some())
298301
}
299302

@@ -349,8 +352,7 @@ impl DFSchema {
349352
} else {
350353
Err(DataFusionError::SchemaError(
351354
SchemaError::AmbiguousReference {
352-
field: Column {
353-
relation: None,
355+
field: Column::Unqualified {
354356
name: name.to_string(),
355357
},
356358
},
@@ -375,9 +377,12 @@ impl DFSchema {
375377

376378
/// Find the field with the given qualified column
377379
pub fn field_from_column(&self, column: &Column) -> Result<&DFField> {
378-
match &column.relation {
379-
Some(r) => self.field_with_qualified_name(r, &column.name),
380-
None => self.field_with_unqualified_name(&column.name),
380+
match column {
381+
Column::Qualified {
382+
relation: qualifier,
383+
name,
384+
} => self.field_with_qualified_name(qualifier, name),
385+
Column::Unqualified { name } => self.field_with_unqualified_name(name),
381386
}
382387
}
383388

@@ -400,9 +405,12 @@ impl DFSchema {
400405

401406
/// Find if the field exists with the given qualified column
402407
pub fn has_column(&self, column: &Column) -> bool {
403-
match &column.relation {
404-
Some(r) => self.has_column_with_qualified_name(r, &column.name),
405-
None => self.has_column_with_unqualified_name(&column.name),
408+
match column {
409+
Column::Qualified {
410+
relation: qualifier,
411+
name,
412+
} => self.has_column_with_qualified_name(qualifier, name),
413+
Column::Unqualified { name } => self.has_column_with_unqualified_name(name),
406414
}
407415
}
408416

@@ -757,18 +765,12 @@ impl DFField {
757765

758766
/// Builds a qualified column based on self
759767
pub fn qualified_column(&self) -> Column {
760-
Column {
761-
relation: self.qualifier.clone(),
762-
name: self.field.name().to_string(),
763-
}
768+
Column::new(self.qualifier().cloned(), self.field.name())
764769
}
765770

766771
/// Builds an unqualified column based on self
767772
pub fn unqualified_column(&self) -> Column {
768-
Column {
769-
relation: None,
770-
name: self.field.name().to_string(),
771-
}
773+
Column::new_unqualified(self.field.name())
772774
}
773775

774776
/// Get the optional qualifier

datafusion/common/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,12 @@ impl Display for SchemaError {
188188
)
189189
}
190190
Self::AmbiguousReference { field } => {
191-
if field.relation.is_some() {
191+
if field.relation().is_some() {
192192
write!(
193193
f,
194194
"Schema contains qualified field name {} and unqualified field name {} which would be ambiguous",
195195
field.quoted_flat_name(),
196-
quote_identifier(&field.name)
196+
quote_identifier(&field.unqualified_name())
197197
)
198198
} else {
199199
write!(

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
5555
let mut is_applicable = true;
5656
expr.apply(&mut |expr| {
5757
Ok(match expr {
58-
Expr::Column(Column { ref name, .. }) => {
58+
Expr::Column(Column::Unqualified { ref name })
59+
| Expr::Column(Column::Qualified { ref name, .. }) => {
5960
is_applicable &= col_names.contains(name);
6061
if is_applicable {
6162
VisitRecursion::Skip

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ impl ListingTable {
684684
.map(|expr| {
685685
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
686686
if let Expr::Column(col) = expr.as_ref() {
687-
let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
687+
let expr = physical_plan::expressions::col(&col.unqualified_name(), self.table_schema.as_ref())?;
688688
Ok(PhysicalSortExpr {
689689
expr,
690690
options: SortOptions {

0 commit comments

Comments
 (0)