Skip to content

Commit 95fe304

Browse files
committed
use btree to search in DFSchema
1 parent a0c5aff commit 95fe304

File tree

3 files changed

+194
-42
lines changed

3 files changed

+194
-42
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 186 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
1919
//! fields with optional relation names.
2020
21-
use std::collections::{HashMap, HashSet};
21+
use std::borrow::Cow;
22+
use std::cmp::Ordering;
23+
use std::collections::btree_map::Entry;
24+
use std::collections::{BTreeMap, HashMap, HashSet};
2225
use std::convert::TryFrom;
2326
use std::fmt::{Display, Formatter};
2427
use std::hash::Hash;
@@ -35,11 +38,122 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
3538
/// A reference-counted reference to a `DFSchema`.
3639
pub type DFSchemaRef = Arc<DFSchema>;
3740

41+
/// [`FieldReference`]s represent a multi part identifier (path) to a
42+
/// field that may require further resolution.
43+
#[derive(Debug, Clone, PartialEq, Eq)]
44+
struct FieldReference<'a> {
45+
/// The field name
46+
name: Cow<'a, str>,
47+
/// Optional qualifier (usually a table or relation name)
48+
qualifier: Option<TableReference<'a>>,
49+
}
50+
51+
impl<'a> PartialOrd for FieldReference<'a> {
52+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53+
Some(self.cmp(other))
54+
}
55+
}
56+
57+
impl<'a> Ord for FieldReference<'a> {
58+
fn cmp(&self, other: &Self) -> Ordering {
59+
if self == other {
60+
return Ordering::Equal;
61+
}
62+
63+
match self.field().cmp(other.field()) {
64+
Ordering::Less => return Ordering::Less,
65+
Ordering::Greater => return Ordering::Greater,
66+
Ordering::Equal => {}
67+
}
68+
69+
match (self.table(), other.table()) {
70+
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
71+
Ordering::Less => return Ordering::Less,
72+
Ordering::Greater => return Ordering::Greater,
73+
Ordering::Equal => {}
74+
},
75+
(Some(_), None) => return Ordering::Greater,
76+
(None, Some(_)) => return Ordering::Less,
77+
_ => {}
78+
}
79+
80+
match (self.schema(), other.schema()) {
81+
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
82+
Ordering::Less => return Ordering::Less,
83+
Ordering::Greater => return Ordering::Greater,
84+
Ordering::Equal => {}
85+
},
86+
(Some(_), None) => return Ordering::Greater,
87+
(None, Some(_)) => return Ordering::Less,
88+
_ => {}
89+
}
90+
91+
match (self.catalog(), other.catalog()) {
92+
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
93+
Ordering::Less => return Ordering::Less,
94+
Ordering::Greater => return Ordering::Greater,
95+
Ordering::Equal => {}
96+
},
97+
(Some(_), None) => return Ordering::Greater,
98+
(None, Some(_)) => return Ordering::Less,
99+
_ => {}
100+
}
101+
102+
Ordering::Equal
103+
}
104+
}
105+
106+
/// This is a [`FieldReference`] that has 'static lifetime (aka it
107+
/// owns the underlying strings)
108+
type OwnedFieldReference = FieldReference<'static>;
109+
110+
impl<'a> FieldReference<'a> {
111+
/// Convenience method for creating a [`FieldReference`].
112+
pub fn new(
113+
name: impl Into<Cow<'a, str>>,
114+
qualifier: Option<TableReference<'a>>,
115+
) -> Self {
116+
Self {
117+
name: name.into(),
118+
qualifier,
119+
}
120+
}
121+
122+
/// Compare with another [`FieldReference`] as if both are resolved.
123+
/// This allows comparing across variants, where if a field is not present
124+
/// in both variants being compared then it is ignored in the comparison.
125+
pub fn resolved_eq(&self, other: &Self) -> bool {
126+
self.name == other.name
127+
&& match (&self.qualifier, &other.qualifier) {
128+
(Some(lhs), Some(rhs)) => lhs.resolved_eq(rhs),
129+
_ => true,
130+
}
131+
}
132+
133+
fn field(&self) -> &str {
134+
&self.name
135+
}
136+
137+
fn table(&self) -> Option<&str> {
138+
self.qualifier.as_ref().map(|q| q.table())
139+
}
140+
141+
fn schema(&self) -> Option<&str> {
142+
self.qualifier.as_ref().and_then(|q| q.schema())
143+
}
144+
145+
fn catalog(&self) -> Option<&str> {
146+
self.qualifier.as_ref().and_then(|q| q.catalog())
147+
}
148+
}
149+
38150
/// DFSchema wraps an Arrow schema and adds relation names
39151
#[derive(Debug, Clone, PartialEq, Eq)]
40152
pub struct DFSchema {
41153
/// Fields
42154
fields: Vec<DFField>,
155+
/// Fields index
156+
fields_index: BTreeMap<OwnedFieldReference, Vec<usize>>,
43157
/// Additional metadata in form of key value pairs
44158
metadata: HashMap<String, String>,
45159
/// Stores functional dependencies in the schema.
@@ -51,6 +165,7 @@ impl DFSchema {
51165
pub fn empty() -> Self {
52166
Self {
53167
fields: vec![],
168+
fields_index: BTreeMap::new(),
54169
metadata: HashMap::new(),
55170
functional_dependencies: FunctionalDependencies::empty(),
56171
}
@@ -102,8 +217,12 @@ impl DFSchema {
102217
));
103218
}
104219
}
220+
221+
let fields_index = build_index(&fields);
222+
105223
Ok(Self {
106224
fields,
225+
fields_index,
107226
metadata,
108227
functional_dependencies: FunctionalDependencies::empty(),
109228
})
@@ -159,6 +278,20 @@ impl DFSchema {
159278
};
160279
if !duplicated_field {
161280
self.fields.push(field.clone());
281+
let idx = self.fields.len() - 1;
282+
283+
let field_ref = OwnedFieldReference::new(
284+
field.name().clone(),
285+
field.qualifier().map(|q| q.to_owned_reference()),
286+
);
287+
match self.fields_index.entry(field_ref) {
288+
Entry::Vacant(entry) => {
289+
entry.insert(vec![idx]);
290+
}
291+
Entry::Occupied(mut entry) => {
292+
entry.get_mut().push(idx);
293+
}
294+
}
162295
}
163296
}
164297
self.metadata.extend(other_schema.metadata.clone())
@@ -206,34 +339,15 @@ impl DFSchema {
206339
qualifier: Option<&TableReference>,
207340
name: &str,
208341
) -> Result<Option<usize>> {
342+
let field_ref = FieldReference::new(name, qualifier.cloned());
209343
let mut matches = self
210-
.fields
211-
.iter()
212-
.enumerate()
213-
.filter(|(_, field)| match (qualifier, &field.qualifier) {
214-
// field to lookup is qualified.
215-
// current field is qualified and not shared between relations, compare both
216-
// qualifier and name.
217-
(Some(q), Some(field_q)) => {
218-
q.resolved_eq(field_q) && field.name() == name
219-
}
220-
// field to lookup is qualified but current field is unqualified.
221-
(Some(qq), None) => {
222-
// the original field may now be aliased with a name that matches the
223-
// original qualified name
224-
let column = Column::from_qualified_name(field.name());
225-
match column {
226-
Column {
227-
relation: Some(r),
228-
name: column_name,
229-
} => &r == qq && column_name == name,
230-
_ => false,
231-
}
232-
}
233-
// field to lookup is unqualified, no need to compare qualifier
234-
(None, Some(_)) | (None, None) => field.name() == name,
344+
.fields_index
345+
.range(field_ref..)
346+
.take_while(|(q, _indices)| {
347+
q.resolved_eq(&FieldReference::new(name, qualifier.cloned()))
235348
})
236-
.map(|(idx, _)| idx);
349+
.flat_map(|(_q, indices)| indices)
350+
.copied();
237351
Ok(matches.next())
238352
}
239353

@@ -272,9 +386,11 @@ impl DFSchema {
272386

273387
/// Find all fields match the given name
274388
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
275-
self.fields
276-
.iter()
277-
.filter(|field| field.name() == name)
389+
self.fields_index
390+
.range(FieldReference::new(name, None)..)
391+
.take_while(|(q, _indices)| q.field() == name)
392+
.flat_map(|(_q, indices)| indices)
393+
.map(|idx| self.field(*idx))
278394
.collect()
279395
}
280396

@@ -454,25 +570,31 @@ impl DFSchema {
454570

455571
/// Strip all field qualifier in schema
456572
pub fn strip_qualifiers(self) -> Self {
573+
let fields: Vec<DFField> = self
574+
.fields
575+
.into_iter()
576+
.map(|f| f.strip_qualifier())
577+
.collect();
578+
let fields_index = build_index(&fields);
457579
DFSchema {
458-
fields: self
459-
.fields
460-
.into_iter()
461-
.map(|f| f.strip_qualifier())
462-
.collect(),
580+
fields,
581+
fields_index,
463582
..self
464583
}
465584
}
466585

467586
/// Replace all field qualifier with new value in schema
468587
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self {
469588
let qualifier = qualifier.into();
589+
let fields: Vec<DFField> = self
590+
.fields
591+
.into_iter()
592+
.map(|f| DFField::from_qualified(qualifier.clone(), f.field))
593+
.collect();
594+
let fields_index = build_index(&fields);
470595
DFSchema {
471-
fields: self
472-
.fields
473-
.into_iter()
474-
.map(|f| DFField::from_qualified(qualifier.clone(), f.field))
475-
.collect(),
596+
fields,
597+
fields_index,
476598
..self
477599
}
478600
}
@@ -496,6 +618,30 @@ impl DFSchema {
496618
}
497619
}
498620

621+
fn build_index(fields: &[DFField]) -> BTreeMap<OwnedFieldReference, Vec<usize>> {
622+
let mut index = BTreeMap::new();
623+
let iter = fields
624+
.iter()
625+
.map(|field| {
626+
OwnedFieldReference::new(
627+
field.name().clone(),
628+
field.qualifier().map(|q| q.to_owned_reference()),
629+
)
630+
})
631+
.enumerate();
632+
for (idx, field) in iter {
633+
match index.entry(field) {
634+
Entry::Vacant(entry) => {
635+
entry.insert(vec![idx]);
636+
}
637+
Entry::Occupied(mut entry) => {
638+
entry.get_mut().push(idx);
639+
}
640+
}
641+
}
642+
index
643+
}
644+
499645
impl From<DFSchema> for Schema {
500646
/// Convert DFSchema into a Schema
501647
fn from(df_schema: DFSchema) -> Self {

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2327,7 +2327,9 @@ mod tests {
23272327
dict_id: 0, \
23282328
dict_is_ordered: false, \
23292329
metadata: {} } }\
2330-
], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
2330+
], fields_index: {\
2331+
FieldReference { name: \"a\", qualifier: None }: [0]\
2332+
}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
23312333
ExecutionPlan schema: Schema { fields: [\
23322334
Field { \
23332335
name: \"b\", \

datafusion/optimizer/src/optimizer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,11 +499,15 @@ mod tests {
499499
assert_eq!(
500500
"Optimizer rule 'get table_scan rule' failed\ncaused by\nget table_scan rule\ncaused by\n\
501501
Internal error: Failed due to generate a different schema, \
502-
original schema: DFSchema { fields: [], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
502+
original schema: DFSchema { fields: [], fields_index: {}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
503503
new schema: DFSchema { fields: [\
504504
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
505505
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
506506
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }], \
507+
fields_index: {\
508+
FieldReference { name: \"a\", qualifier: Some(Bare { table: \"test\" }) }: [0], \
509+
FieldReference { name: \"b\", qualifier: Some(Bare { table: \"test\" }) }: [1], \
510+
FieldReference { name: \"c\", qualifier: Some(Bare { table: \"test\" }) }: [2]}, \
507511
metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }.\
508512
\nThis was likely caused by a bug in DataFusion's code \
509513
and we would welcome that you file an bug report in our issue tracker",

0 commit comments

Comments
 (0)