-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix duplicate unqualified Field name (schema error) on join queries #15438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix duplicate unqualified Field name (schema error) on join queries #15438
Conversation
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> { | ||
let mut name_map = HashMap::new(); | ||
let mut seen: HashSet<String> = HashSet::new(); | ||
|
||
fields | ||
.into_iter() | ||
.map(|field| { | ||
let counter = name_map.entry(field.name().to_string()).or_insert(0); | ||
*counter += 1; | ||
if *counter > 1 { | ||
let new_name = format!("{}:{}", field.name(), *counter - 1); | ||
Field::new(new_name, field.data_type().clone(), field.is_nullable()) | ||
} else { | ||
field.as_ref().clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be the root cause of the issue: when doing joins, there is a function requalify_sides_if_needed to handle aliasing the columns so the resulting schema of a join : let in_join_schema = left.schema().join(right.schema())?;
can be created . However, if we had a query like:
select
*
from
first_agg
LEFT JOIN fourth_random_table ON first_agg.id = fourth_random_table.id
LEFT JOIN second_agg ON first_agg.id = second_agg.id
LEFT JOIN third_agg ON first_agg.id = third_agg.id
The first JOIN to be converted to a logical plan: LEFT JOIN third_agg ON first_agg.id = third_agg.id
will work, the join schema col names will stay as they are with an alias , however on the subsequent JOINs it will fail since the consumer does the following steps for each JOIN:
- After handling the innermost join the resulting join schema is
[left.id] [right.id]
✅ - For the second join it we "carry" the previous schema, so in
requalify_sides_if_needed
we would have[id, left.id] [id, right.id]
so we would have to alias again ->[left.id, left.id] [right.id, right.id]
and because of this function we would end up having:[left.id:1 , left.id] [right.id:1 , right.id]
✅ - On the outermost and final join the process would be repeated:
[id, left.id:1 , left.id] [id, right.id:1 , right.id]
->
[left.id:1, left.id:1 , left.id] [right.id:1, right.id:1 , right.id]
and because of id:1 being repeated with the currentchange_redundant_column
algorithm, the query will fail withSchema contains duplicate unqualified field name "id:1"
🟥
Moreover we can observe that if we do just two levels of joins we would get no error:
select
*
from
first_agg
LEFT JOIN fourth_random_table ON first_agg.id = fourth_random_table.id
LEFT JOIN second_agg ON first_agg.id = second_agg.id
if col.name() != matching_input_field.name() { | ||
return internal_err!("Input field name {} does not match with the projection expression {}", | ||
matching_input_field.name(),col.name()) | ||
} | ||
let fixed_col = Column::new(col.name(), idx); | ||
return Ok(Transformed::yes(Arc::new(fixed_col))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this check is skipped the query will still work, same as it can be skipped here for aggregate nodes schema check. Without this we would get the error: Input field name count(Int64(1)) does not match with the projection expression count(Int64(1)):1
still it would be nice to know if this is he correct approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should revert this change. This check was helpful in catching many errors, especially while developing projection-related code (e.g. projection pushdown). Sorry for my delayed response, but @LiaCastaneda, could you please address the root cause of the issue and revert this change?
As you mentioned, the problem likely stems from inconsistent naming conventions between columns and fields. I recall encountering similar issues with aggregation functions in the past, and we resolved them by unifying the naming. I believe the correct fix shouldn’t require too much effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, sorry about this - I'm trying to understand this, in that change what I did is to ammend the issue by unifying the naming instead of skipping the check.
I checked on how we build the LogicalPlan and input fields have the same names as the projection column expressions, both have count(Int64(1)):1 so I don't know where that field is being set to count(Int64(1)) for the physcial input_schema. On the physical planner I also printed the Logical input schema here and it appears as count(Int64(1)):1 . I also noticed that same function creates a physical Expr for the projection based on the logical input_schema and not the physical input_exec schema hence why it fails later on the check.
I think an approach could be to move the check here and add an option on the runtime config we can get thorugh the session_state to skip it (and by default set it to false) so errors can still be caught while developing (iiuc this doesn't cause errors during execution), something similar was done for aggregate nodes. I was looking into your PR that adds that check, but apparently we don't have create_physical_name
anymore after #11977
edit: I have a solution in mind based on what I mentioned above, can this wait a couple of days instead of reverting the PR commit, since it will also revert the duplicate schema names fix. I will try opening another PR this week which will include the check back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2144459
to
3ec9665
Compare
a11fba0
to
4867827
Compare
4867827
to
2f925ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! this one seems like a very hard to catch problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @LiaCastaneda and @gabotechs -- this PR looks good to me
field.as_ref().clone() | ||
let base_name = field.name(); | ||
let count = name_map.entry(base_name.clone()).or_insert(0); | ||
let mut new_name = base_name.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with trying to avoid this clone
, but I could not come up with anything that was reasonable
Since this function is only called when creating subqueries I think it is fine
https://github.com/search?q=repo%3Aapache%2Fdatafusion%20change_redundant_column&type=code
return internal_err!("Input field name {} does not match with the projection expression {}", | ||
matching_input_field.name(),col.name()) | ||
} | ||
let fixed_col = Column::new(col.name(), idx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @berkaysynnada and @akurmustafa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for being late :( https://github.com/apache/datafusion/pull/15438/files#discussion_r2025001167
Hi @LiaCastaneda -- I believe the CI has failed on this PR due to a change in the CI actions. Can you please merge the PR up to main which i think will address the issue |
…schema-error-on-join-queries
Thanks for the reviews @alamb and @gabotechs |
…pache#15438) * Fix duplicate unqualified field name issue * Adjust Projection Properly * Add reproducer plan * Adjust comment * Set metadata to be the same as well * Fix substrait reproducer + Add test case * Format * Add explanation comment * Add test case to change_redundant_column
…pache#15438) (#15) * Fix duplicate unqualified field name issue * Adjust Projection Properly * Add reproducer plan * Adjust comment * Set metadata to be the same as well * Fix substrait reproducer + Add test case * Format * Add explanation comment * Add test case to change_redundant_column
…pache#15438) * Fix duplicate unqualified field name issue * Adjust Projection Properly * Add reproducer plan * Adjust comment * Set metadata to be the same as well * Fix substrait reproducer + Add test case * Format * Add explanation comment * Add test case to change_redundant_column
Which issue does this PR close?
Rationale for this change
As mentioned on the issue, when using the substrait consumer, doing multiple JOINs fail because renaming logic fails to make column names unique.
What changes are included in this PR?
Are these changes tested?
All tests pass, and a new test was added on
consumer_integration.rs
and ontest_change_redundant_column
Are there any user-facing changes?