-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Core: Fix UNION field nullability tracking #14356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -699,15 +699,13 @@ impl LogicalPlan { | |||
})) | ||||
} | ||||
LogicalPlan::Union(Union { inputs, schema }) => { | ||||
let input_schema = inputs[0].schema(); | ||||
// If inputs are not pruned do not change schema | ||||
// TODO this seems wrong (shouldn't we always use the schema of the input?) | ||||
let schema = if schema.fields().len() == input_schema.fields().len() { | ||||
Arc::clone(&schema) | ||||
let first_input_schema = inputs[0].schema(); | ||||
if schema.fields().len() == first_input_schema.fields().len() { | ||||
// If inputs are not pruned do not change schema | ||||
Ok(LogicalPlan::Union(Union { inputs, schema })) | ||||
} else { | ||||
Arc::clone(input_schema) | ||||
}; | ||||
Ok(LogicalPlan::Union(Union { inputs, schema })) | ||||
Ok(LogicalPlan::Union(Union::try_new(inputs)?)) | ||||
} | ||||
} | ||||
LogicalPlan::Distinct(distinct) => { | ||||
let distinct = match distinct { | ||||
|
@@ -2645,6 +2643,107 @@ pub struct Union { | |||
pub schema: DFSchemaRef, | ||||
} | ||||
|
||||
impl Union { | ||||
/// Constructs new Union instance deriving schema from inputs. | ||||
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> { | ||||
let schema = Self::derive_schema_from_inputs(&inputs, false)?; | ||||
Ok(Union { inputs, schema }) | ||||
} | ||||
|
||||
/// Constructs new Union instance deriving schema from inputs. | ||||
/// Inputs do not have to have matching types and produced schema will | ||||
/// take type from the first input. | ||||
// TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all. | ||||
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another name for this one is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this one should be coercing types, but it's not doing this. It just takes a type from first UNION branch. I didn't know why this logic existed in this shape, but it felt intentional enough not to simply replace it in this PR. I would prefer it to be fixed later... Hence this function name to make the caller wonder what "loose types" mean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this was noted once before. Thank you for making it more explicit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. -> #14380 |
||||
let schema = Self::derive_schema_from_inputs(&inputs, true)?; | ||||
Ok(Union { inputs, schema }) | ||||
} | ||||
|
||||
/// Constructs new Union instance deriving schema from inputs. | ||||
/// | ||||
/// `loose_types` if true, inputs do not have to have matching types and produced schema will | ||||
/// take type from the first input. TODO (<https://github.com/apache/datafusion/issues/14380>) this is not necessarily reasonable behavior. | ||||
fn derive_schema_from_inputs( | ||||
inputs: &[Arc<LogicalPlan>], | ||||
loose_types: bool, | ||||
) -> Result<DFSchemaRef> { | ||||
if inputs.len() < 2 { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is already code that computes the coerced schema in the analyzer:
Can we reuse the same logic? Maybe we can move the coercion code here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the optimizer case, no coercion logic should be invoked, the types must match. For the variant which constructs new schema from some uncoerced inputs -- currently called "loose types" and currently not doing coercion to maintain behavior as it was -- yes, i agree this one could use the coerce_union_schema from the analyzer (perhaps moving the logic into here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we can file a ticket describing what is desired and leave a comment in the code referring to that ticket There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making sure I understand the use case. If I want to construct a UNION logical plan with different types that are coercible (be it by current builtin rules or future user-defined rules), then I would use the Then what exactly is the use case for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct.
"schema recompute" is an overloaded term There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
return plan_err!("UNION requires at least two inputs"); | ||||
} | ||||
let first_schema = inputs[0].schema(); | ||||
let fields_count = first_schema.fields().len(); | ||||
for input in inputs.iter().skip(1) { | ||||
if fields_count != input.schema().fields().len() { | ||||
return plan_err!( | ||||
"UNION queries have different number of columns: \ | ||||
left has {} columns whereas right has {} columns", | ||||
fields_count, | ||||
input.schema().fields().len() | ||||
); | ||||
} | ||||
} | ||||
|
||||
let union_fields = (0..fields_count) | ||||
.map(|i| { | ||||
let fields = inputs | ||||
.iter() | ||||
.map(|input| input.schema().field(i)) | ||||
.collect::<Vec<_>>(); | ||||
let first_field = fields[0]; | ||||
let name = first_field.name(); | ||||
let data_type = if loose_types { | ||||
// TODO apply type coercion here, or document why it's better to defer | ||||
// temporarily use the data type from the left input and later rely on the analyzer to | ||||
// coerce the two schemas into a common one. | ||||
first_field.data_type() | ||||
} else { | ||||
fields.iter().skip(1).try_fold( | ||||
first_field.data_type(), | ||||
|acc, field| { | ||||
if acc != field.data_type() { | ||||
return plan_err!( | ||||
"UNION field {i} have different type in inputs: \ | ||||
left has {} whereas right has {}", | ||||
first_field.data_type(), | ||||
field.data_type() | ||||
); | ||||
} | ||||
Ok(acc) | ||||
}, | ||||
)? | ||||
}; | ||||
let nullable = fields.iter().any(|field| field.is_nullable()); | ||||
let mut field = Field::new(name, data_type.clone(), nullable); | ||||
let field_metadata = | ||||
intersect_maps(fields.iter().map(|field| field.metadata())); | ||||
field.set_metadata(field_metadata); | ||||
// TODO reusing table reference from the first schema is probably wrong | ||||
let table_reference = first_schema.qualified_field(i).0.cloned(); | ||||
Ok((table_reference, Arc::new(field))) | ||||
}) | ||||
.collect::<Result<_>>()?; | ||||
let union_schema_metadata = | ||||
intersect_maps(inputs.iter().map(|input| input.schema().metadata())); | ||||
|
||||
// Functional Dependencies doesn't preserve after UNION operation | ||||
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; | ||||
let schema = Arc::new(schema); | ||||
|
||||
Ok(schema) | ||||
} | ||||
} | ||||
|
||||
fn intersect_maps<'a>( | ||||
inputs: impl IntoIterator<Item = &'a HashMap<String, String>>, | ||||
) -> HashMap<String, String> { | ||||
let mut inputs = inputs.into_iter(); | ||||
let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default(); | ||||
for input in inputs { | ||||
merged.retain(|k, v| input.get(k) == Some(v)); | ||||
} | ||||
merged | ||||
} | ||||
|
||||
// Manual implementation needed because of `schema` field. Comparison excludes this field. | ||||
impl PartialOrd for Union { | ||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||||
|
Uh oh!
There was an error while loading. Please reload this page.