Skip to content

Commit eef461f

Browse files
LiaCastanedaNirnay Roy
authored and
Nirnay Roy
committed
Fix duplicate unqualified Field name (schema error) on join queries (apache#15438)
* Fix duplicate unqualified field name issue * Adjust Projection Properly * Add reproducer plan * Adjust comment * Set metadata to be the same as well * Fix substrait reproducer + Add test case * Format * Add explanation comment * Add test case to change_redundant_column
1 parent 7275c95 commit eef461f

File tree

5 files changed

+598
-14
lines changed

5 files changed

+598
-14
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,19 +1468,37 @@ impl ValuesFields {
14681468
}
14691469
}
14701470

1471+
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1472+
//
1473+
// Some field names might already come to this function with the count (number of times it appeared)
1474+
// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
1475+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1476+
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1477+
// that's why we need the `seen` set, so the fields are always unique.
1478+
//
14711479
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
14721480
let mut name_map = HashMap::new();
1481+
let mut seen: HashSet<String> = HashSet::new();
1482+
14731483
fields
14741484
.into_iter()
14751485
.map(|field| {
1476-
let counter = name_map.entry(field.name().to_string()).or_insert(0);
1477-
*counter += 1;
1478-
if *counter > 1 {
1479-
let new_name = format!("{}:{}", field.name(), *counter - 1);
1480-
Field::new(new_name, field.data_type().clone(), field.is_nullable())
1481-
} else {
1482-
field.as_ref().clone()
1486+
let base_name = field.name();
1487+
let count = name_map.entry(base_name.clone()).or_insert(0);
1488+
let mut new_name = base_name.clone();
1489+
1490+
// Loop until we find a name that hasn't been used
1491+
while seen.contains(&new_name) {
1492+
*count += 1;
1493+
new_name = format!("{}:{}", base_name, count);
14831494
}
1495+
1496+
seen.insert(new_name.clone());
1497+
1498+
let mut modified_field =
1499+
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1500+
modified_field.set_metadata(field.metadata().clone());
1501+
modified_field
14841502
})
14851503
.collect()
14861504
}
@@ -2730,10 +2748,13 @@ mod tests {
27302748
let t1_field_1 = Field::new("a", DataType::Int32, false);
27312749
let t2_field_1 = Field::new("a", DataType::Int32, false);
27322750
let t2_field_3 = Field::new("a", DataType::Int32, false);
2751+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
27332752
let t1_field_2 = Field::new("b", DataType::Int32, false);
27342753
let t2_field_2 = Field::new("b", DataType::Int32, false);
27352754

2736-
let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
2755+
let field_vec = vec![
2756+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2757+
];
27372758
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
27382759

27392760
assert_eq!(
@@ -2744,6 +2765,7 @@ mod tests {
27442765
Field::new("b", DataType::Int32, false),
27452766
Field::new("b:1", DataType::Int32, false),
27462767
Field::new("a:2", DataType::Int32, false),
2768+
Field::new("a:1:1", DataType::Int32, false),
27472769
]
27482770
);
27492771
Ok(())

datafusion/physical-expr/src/equivalence/projection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::PhysicalExpr;
2222

2323
use arrow::datatypes::SchemaRef;
2424
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25-
use datafusion_common::{internal_err, Result};
25+
use datafusion_common::Result;
2626

2727
/// Stores the mapping between source expressions and target expressions for a
2828
/// projection.
@@ -66,9 +66,9 @@ impl ProjectionMapping {
6666
let idx = col.index();
6767
let matching_input_field = input_schema.field(idx);
6868
if col.name() != matching_input_field.name() {
69-
return internal_err!("Input field name {} does not match with the projection expression {}",
70-
matching_input_field.name(),col.name())
71-
}
69+
let fixed_col = Column::new(col.name(), idx);
70+
return Ok(Transformed::yes(Arc::new(fixed_col)));
71+
}
7272
let matching_input_column =
7373
Column::new(matching_input_field.name(), idx);
7474
Ok(Transformed::yes(Arc::new(matching_input_column)))

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,8 +1835,7 @@ fn requalify_sides_if_needed(
18351835
})
18361836
}) {
18371837
// These names have no connection to the original plan, but they'll make the columns
1838-
// (mostly) unique. There may be cases where this still causes duplicates, if either left
1839-
// or right side itself contains duplicate names with different qualifiers.
1838+
// (mostly) unique.
18401839
Ok((
18411840
left.alias(TableReference::bare("left"))?,
18421841
right.alias(TableReference::bare("right"))?,

datafusion/substrait/tests/cases/consumer_integration.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,33 @@ mod tests {
519519
Ok(())
520520
}
521521

522+
#[tokio::test]
523+
async fn test_multiple_joins() -> Result<()> {
524+
let plan_str = test_plan_to_string("multiple_joins.json").await?;
525+
assert_eq!(
526+
plan_str,
527+
"Projection: left.count(Int64(1)) AS count_first, left.category, left.count(Int64(1)):1 AS count_second, right.count(Int64(1)) AS count_third\
528+
\n Left Join: left.id = right.id\
529+
\n SubqueryAlias: left\
530+
\n Left Join: left.id = right.id\
531+
\n SubqueryAlias: left\
532+
\n Left Join: left.id = right.id\
533+
\n SubqueryAlias: left\
534+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
535+
\n Values: (Int64(1)), (Int64(2))\
536+
\n SubqueryAlias: right\
537+
\n Aggregate: groupBy=[[id, category]], aggr=[[]]\
538+
\n Values: (Int64(1), Utf8(\"info\")), (Int64(2), Utf8(\"low\"))\
539+
\n SubqueryAlias: right\
540+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
541+
\n Values: (Int64(1)), (Int64(2))\
542+
\n SubqueryAlias: right\
543+
\n Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]]\
544+
\n Values: (Int64(1)), (Int64(2))"
545+
);
546+
Ok(())
547+
}
548+
522549
#[tokio::test]
523550
async fn test_select_window_count() -> Result<()> {
524551
let plan_str = test_plan_to_string("select_window_count.substrait.json").await?;

0 commit comments

Comments
 (0)