Skip to content

Commit 8533371

Browse files
authored
Fix duplicate unqualified Field name (schema error) on join queries (apache#15438) (#15)
* Fix duplicate unqualified field name issue * Adjust Projection Properly * Add reproducer plan * Adjust comment * Set metadata to be the same as well * Fix substrait reproducer + Add test case * Format * Add explanation comment * Add test case to change_redundant_column
1 parent 6867bfe commit 8533371

File tree

5 files changed

+598
-14
lines changed

5 files changed

+598
-14
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,19 +1432,37 @@ impl ValuesFields {
14321432
}
14331433
}
14341434

1435+
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1436+
//
1437+
// Some field names might already come to this function with the count (number of times it appeared)
1438+
// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
1439+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1440+
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1441+
// that's why we need the `seen` set, so the fields are always unique.
1442+
//
14351443
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
14361444
let mut name_map = HashMap::new();
1445+
let mut seen: HashSet<String> = HashSet::new();
1446+
14371447
fields
14381448
.into_iter()
14391449
.map(|field| {
1440-
let counter = name_map.entry(field.name().to_string()).or_insert(0);
1441-
*counter += 1;
1442-
if *counter > 1 {
1443-
let new_name = format!("{}:{}", field.name(), *counter - 1);
1444-
Field::new(new_name, field.data_type().clone(), field.is_nullable())
1445-
} else {
1446-
field.as_ref().clone()
1450+
let base_name = field.name();
1451+
let count = name_map.entry(base_name.clone()).or_insert(0);
1452+
let mut new_name = base_name.clone();
1453+
1454+
// Loop until we find a name that hasn't been used
1455+
while seen.contains(&new_name) {
1456+
*count += 1;
1457+
new_name = format!("{}:{}", base_name, count);
14471458
}
1459+
1460+
seen.insert(new_name.clone());
1461+
1462+
let mut modified_field =
1463+
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1464+
modified_field.set_metadata(field.metadata().clone());
1465+
modified_field
14481466
})
14491467
.collect()
14501468
}
@@ -2632,10 +2650,13 @@ mod tests {
26322650
let t1_field_1 = Field::new("a", DataType::Int32, false);
26332651
let t2_field_1 = Field::new("a", DataType::Int32, false);
26342652
let t2_field_3 = Field::new("a", DataType::Int32, false);
2653+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
26352654
let t1_field_2 = Field::new("b", DataType::Int32, false);
26362655
let t2_field_2 = Field::new("b", DataType::Int32, false);
26372656

2638-
let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
2657+
let field_vec = vec![
2658+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2659+
];
26392660
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
26402661

26412662
assert_eq!(
@@ -2646,6 +2667,7 @@ mod tests {
26462667
Field::new("b", DataType::Int32, false),
26472668
Field::new("b:1", DataType::Int32, false),
26482669
Field::new("a:2", DataType::Int32, false),
2670+
Field::new("a:1:1", DataType::Int32, false),
26492671
]
26502672
);
26512673
Ok(())

datafusion/physical-expr/src/equivalence/projection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::PhysicalExpr;
2222

2323
use arrow::datatypes::SchemaRef;
2424
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25-
use datafusion_common::{internal_err, Result};
25+
use datafusion_common::Result;
2626

2727
/// Stores the mapping between source expressions and target expressions for a
2828
/// projection.
@@ -66,9 +66,9 @@ impl ProjectionMapping {
6666
let idx = col.index();
6767
let matching_input_field = input_schema.field(idx);
6868
if col.name() != matching_input_field.name() {
69-
return internal_err!("Input field name {} does not match with the projection expression {}",
70-
matching_input_field.name(),col.name())
71-
}
69+
let fixed_col = Column::new(col.name(), idx);
70+
return Ok(Transformed::yes(Arc::new(fixed_col)));
71+
}
7272
let matching_input_column =
7373
Column::new(matching_input_field.name(), idx);
7474
Ok(Transformed::yes(Arc::new(matching_input_column)))

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,8 +1804,7 @@ fn requalify_sides_if_needed(
18041804
})
18051805
}) {
18061806
// These names have no connection to the original plan, but they'll make the columns
1807-
// (mostly) unique. There may be cases where this still causes duplicates, if either left
1808-
// or right side itself contains duplicate names with different qualifiers.
1807+
// (mostly) unique.
18091808
Ok((
18101809
left.alias(TableReference::bare("left"))?,
18111810
right.alias(TableReference::bare("right"))?,

datafusion/substrait/tests/cases/consumer_integration.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,33 @@ mod tests {
483483
Ok(())
484484
}
485485

486+
#[tokio::test]
487+
async fn test_multiple_joins() -> Result<()> {
488+
let plan_str = test_plan_to_string("multiple_joins.json").await?;
489+
assert_eq!(
490+
plan_str,
491+
"Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\
492+
\n Left Join: left.id = right.id\
493+
\n SubqueryAlias: left\
494+
\n Left Join: left.id = right.id\
495+
\n SubqueryAlias: left\
496+
\n Left Join: left.id = right.id\
497+
\n SubqueryAlias: left\
498+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
499+
\n Values: (Int64(1)), (Int64(2))\
500+
\n SubqueryAlias: right\
501+
\n Aggregate: groupBy=[[id, category]], aggr=[[]]\
502+
\n Values: (Int64(1), Utf8(\"info\")), (Int64(2), Utf8(\"low\"))\
503+
\n SubqueryAlias: right\
504+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
505+
\n Values: (Int64(1)), (Int64(2))\
506+
\n SubqueryAlias: right\
507+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
508+
\n Values: (Int64(1)), (Int64(2))"
509+
);
510+
Ok(())
511+
}
512+
486513
#[tokio::test]
487514
async fn test_select_window_count() -> Result<()> {
488515
let plan_str = test_plan_to_string("select_window_count.substrait.json").await?;

0 commit comments

Comments
 (0)