Skip to content

Commit 2790539

Browse files
author
Evgeny Maruschenko
committed
Add fields_index for searching column index
1 parent 37d6bf0 commit 2790539

File tree

3 files changed

+63
-15
lines changed

3 files changed

+63
-15
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub type DFSchemaRef = Arc<DFSchema>;
4242
pub struct DFSchema {
4343
/// Fields
4444
fields: Vec<DFField>,
45+
// For searching column id by field name
46+
fields_index: HashMap<String, Vec<usize>>,
4547
/// Additional metadata in form of key value pairs
4648
metadata: HashMap<String, String>,
4749
/// Stores functional dependencies in the schema.
@@ -53,6 +55,7 @@ impl DFSchema {
5355
pub fn empty() -> Self {
5456
Self {
5557
fields: vec![],
58+
fields_index: HashMap::new(),
5659
metadata: HashMap::new(),
5760
functional_dependencies: FunctionalDependencies::empty(),
5861
}
@@ -104,9 +107,24 @@ impl DFSchema {
104107
));
105108
}
106109
}
110+
111+
let mut fields_index: HashMap<String, Vec<usize>> = HashMap::new();
112+
for (idx, field) in fields.iter().enumerate() {
113+
let unqualified_name = match field.qualifier {
114+
Some(_) => field.name().to_owned(),
115+
None => Column::from_qualified_name(field.name()).name,
116+
};
117+
field.qualified_name();
118+
fields_index
119+
.entry(unqualified_name)
120+
.and_modify(|vec| vec.push(idx))
121+
.or_insert(vec![idx]);
122+
}
123+
107124
Ok(Self {
108125
fields,
109126
metadata,
127+
fields_index,
110128
functional_dependencies: FunctionalDependencies::empty(),
111129
})
112130
}
@@ -161,6 +179,16 @@ impl DFSchema {
161179
};
162180
if !duplicated_field {
163181
self.fields.push(field.clone());
182+
183+
let idx = self.fields.len() - 1;
184+
let unqualified_name = match field.qualifier {
185+
Some(_) => field.name().to_owned(),
186+
None => Column::from_qualified_name(field.name()).name,
187+
};
188+
self.fields_index
189+
.entry(unqualified_name)
190+
.and_modify(|vec| vec.push(idx))
191+
.or_insert(vec![idx]);
164192
}
165193
}
166194
self.metadata.extend(other_schema.metadata.clone())
@@ -208,32 +236,50 @@ impl DFSchema {
208236
qualifier: Option<&TableReference>,
209237
name: &str,
210238
) -> Result<Option<usize>> {
211-
let mut matches = self
212-
.fields
239+
let unqualified_name = match qualifier {
240+
Some(_) => name.to_owned(),
241+
None => Column::from_qualified_name(name).name,
242+
};
243+
let Some(matched_fields_idx) = self.fields_index.get(&unqualified_name) else {
244+
return Ok(None);
245+
};
246+
247+
let Some(qualifier) = qualifier else {
248+
// field to lookup is unqualified, no need to compare qualifier
249+
let mut matches = matched_fields_idx
250+
.iter()
251+
.map(|idx| (*idx, self.field(*idx)))
252+
.filter(|(_, field)| field.name() == name)
253+
.map(|(idx, _)| idx);
254+
return Ok(matches.next());
255+
};
256+
257+
// field to lookup is qualified.
258+
let mut matches = matched_fields_idx
213259
.iter()
214-
.enumerate()
215-
.filter(|(_, field)| match (qualifier, &field.qualifier) {
260+
.map(|idx| (*idx, self.field(*idx)))
261+
.filter(|(_, field)| match &field.qualifier {
216262
// field to lookup is qualified.
217263
// current field is qualified and not shared between relations, compare both
218264
// qualifier and name.
219-
(Some(q), Some(field_q)) => {
220-
q.resolved_eq(field_q) && field.name() == name
265+
Some(field_qualifier) => {
266+
qualifier.resolved_eq(field_qualifier) && field.name() == name
221267
}
222268
// field to lookup is qualified but current field is unqualified.
223-
(Some(qq), None) => {
269+
None => {
224270
// the original field may now be aliased with a name that matches the
225271
// original qualified name
226272
let column = Column::from_qualified_name(field.name());
227273
match column {
228274
Column {
229-
relation: Some(r),
275+
relation: Some(column_qualifier),
230276
name: column_name,
231-
} => &r == qq && column_name == name,
277+
} => {
278+
column_qualifier.resolved_eq(qualifier) && column_name == name
279+
}
232280
_ => false,
233281
}
234282
}
235-
// field to lookup is unqualified, no need to compare qualifier
236-
(None, Some(_)) | (None, None) => field.name() == name,
237283
})
238284
.map(|(idx, _)| idx);
239285
Ok(matches.next())

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2326,7 +2326,7 @@ mod tests {
23262326
dict_id: 0, \
23272327
dict_is_ordered: false, \
23282328
metadata: {} } }\
2329-
], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
2329+
], fields_index: {\"a\": [0]}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
23302330
ExecutionPlan schema: Schema { fields: [\
23312331
Field { \
23322332
name: \"b\", \

datafusion/optimizer/src/optimizer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,9 +438,11 @@ fn assert_schema_is_the_same(
438438

439439
if !equivalent {
440440
let e = DataFusionError::Internal(format!(
441-
"Failed due to generate a different schema, original schema: {:?}, new schema: {:?}",
442-
prev_plan.schema(),
443-
new_plan.schema()
441+
"Failed due to generate a different schema\
442+
, original schema: DFSchema {{ fields: {:?}, metadata: {:?}, functional_dependencies: {:?} }}\
443+
, new schema: DFSchema {{ fields: {:?}, metadata: {:?}, functional_dependencies: {:?} }}",
444+
prev_plan.schema().fields(), prev_plan.schema().metadata(), prev_plan.schema().functional_dependencies(),
445+
new_plan.schema().fields(), new_plan.schema().metadata(), new_plan.schema().functional_dependencies(),
444446
));
445447
Err(DataFusionError::Context(
446448
String::from(rule_name),

0 commit comments

Comments
 (0)