Skip to content

Commit ada016c

Browse files
committed
Fix UNION field nullability tracking
This commit fixes two bugs related to UNION handling - when constructing union plan nullability of the other union branch was ignored, thus resulting field could easily have incorrect nullability - when pruning/simplifying projects, in `recompute_schema` function there was similar logic, thus loosing nullability information even for correctly constructed Union plan node As a result, other optimizer logic (e.g. `expr_simplifier.rs`) could draw incorrect conclusions and thus lead to incorrect query results, as demonstrated with the attached SLT test.
1 parent 66b4da2 commit ada016c

File tree

3 files changed

+126
-31
lines changed

3 files changed

+126
-31
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
5454
use datafusion_common::{
5555
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
5656
plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
57-
FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
58-
UnnestOptions,
57+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5958
};
6059
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
6160

@@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>(
15181517
/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
15191518
/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
15201519
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1521-
if left_plan.schema().fields().len() != right_plan.schema().fields().len() {
1522-
return plan_err!(
1523-
"UNION queries have different number of columns: \
1524-
left has {} columns whereas right has {} columns",
1525-
left_plan.schema().fields().len(),
1526-
right_plan.schema().fields().len()
1527-
);
1528-
}
1529-
1530-
// Temporarily use the schema from the left input and later rely on the analyzer to
1531-
// coerce the two schemas into a common one.
1532-
1533-
// Functional Dependencies doesn't preserve after UNION operation
1534-
let schema = (**left_plan.schema()).clone();
1535-
let schema =
1536-
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
1537-
1538-
Ok(LogicalPlan::Union(Union {
1539-
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
1540-
schema,
1541-
}))
1520+
Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1521+
Arc::new(left_plan),
1522+
Arc::new(right_plan),
1523+
])?))
15421524
}
15431525

15441526
/// Create Projection

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -699,15 +699,13 @@ impl LogicalPlan {
699699
}))
700700
}
701701
LogicalPlan::Union(Union { inputs, schema }) => {
702-
let input_schema = inputs[0].schema();
703-
// If inputs are not pruned do not change schema
704-
// TODO this seems wrong (shouldn't we always use the schema of the input?)
705-
let schema = if schema.fields().len() == input_schema.fields().len() {
706-
Arc::clone(&schema)
702+
let first_input_schema = inputs[0].schema();
703+
if schema.fields().len() == first_input_schema.fields().len() {
704+
// If inputs are not pruned do not change schema
705+
Ok(LogicalPlan::Union(Union { inputs, schema }))
707706
} else {
708-
Arc::clone(input_schema)
709-
};
710-
Ok(LogicalPlan::Union(Union { inputs, schema }))
707+
Ok(LogicalPlan::Union(Union::try_new(inputs)?))
708+
}
711709
}
712710
LogicalPlan::Distinct(distinct) => {
713711
let distinct = match distinct {
@@ -2645,6 +2643,106 @@ pub struct Union {
26452643
pub schema: DFSchemaRef,
26462644
}
26472645

2646+
impl Union {
2647+
/// Constructs new Union instance deriving schema from inputs.
2648+
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2649+
let schema = Self::derive_schema_from_inputs(&inputs, false)?;
2650+
Ok(Union { inputs, schema })
2651+
}
2652+
2653+
/// Constructs new Union instance deriving schema from inputs.
2654+
/// Inputs do not have to have matching types and produced schema will
2655+
/// take type from the first input.
2656+
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2657+
let schema = Self::derive_schema_from_inputs(&inputs, true)?;
2658+
Ok(Union { inputs, schema })
2659+
}
2660+
2661+
/// Constructs new Union instance deriving schema from inputs.
2662+
///
2663+
/// `loose_types` if true, inputs do not have to have matching types and produced schema will
2664+
/// take type from the first input. TODO this is not necessarily reasonable behavior.
2665+
fn derive_schema_from_inputs(
2666+
inputs: &[Arc<LogicalPlan>],
2667+
loose_types: bool,
2668+
) -> Result<DFSchemaRef> {
2669+
if inputs.len() < 2 {
2670+
return plan_err!("UNION requires at least two inputs");
2671+
}
2672+
let first_schema = inputs[0].schema();
2673+
let fields_count = first_schema.fields().len();
2674+
for input in inputs {
2675+
if fields_count != input.schema().fields().len() {
2676+
return plan_err!(
2677+
"UNION queries have different number of columns: \
2678+
left has {} columns whereas right has {} columns",
2679+
fields_count,
2680+
input.schema().fields().len()
2681+
);
2682+
}
2683+
}
2684+
2685+
let union_fields = (0..fields_count)
2686+
.map(|i| {
2687+
let fields = inputs
2688+
.iter()
2689+
.map(|input| input.schema().field(i))
2690+
.collect::<Vec<_>>();
2691+
let first_field = fields[0];
2692+
let name = first_field.name();
2693+
let data_type = if loose_types {
2694+
// TODO apply type coercion here, or document why it's better to defer
2695+
// temporarily use the data type from the left input and later rely on the analyzer to
2696+
// coerce the two schemas into a common one.
2697+
first_field.data_type()
2698+
} else {
2699+
fields.iter().skip(1).try_fold(
2700+
first_field.data_type(),
2701+
|acc, field| {
2702+
if acc != field.data_type() {
2703+
return plan_err!(
2704+
"UNION field {i} have different type in inputs: \
2705+
left has {} whereas right has {}",
2706+
first_field.data_type(),
2707+
field.data_type()
2708+
);
2709+
}
2710+
Ok(acc)
2711+
},
2712+
)?
2713+
};
2714+
let nullable = fields.iter().any(|field| field.is_nullable());
2715+
let mut field = Field::new(name, data_type.clone(), nullable);
2716+
let field_metadata =
2717+
intersect_maps(fields.iter().map(|field| field.metadata()));
2718+
field.set_metadata(field_metadata);
2719+
// TODO reusing table reference from the first schema is probably wrong
2720+
let table_reference = first_schema.qualified_field(i).0.cloned();
2721+
Ok((table_reference, Arc::new(field)))
2722+
})
2723+
.collect::<Result<_>>()?;
2724+
let union_schema_metadata =
2725+
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2726+
2727+
// Functional Dependencies doesn't preserve after UNION operation
2728+
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2729+
let schema = Arc::new(schema);
2730+
2731+
Ok(schema)
2732+
}
2733+
}
2734+
2735+
fn intersect_maps<'a>(
2736+
inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2737+
) -> HashMap<String, String> {
2738+
let mut inputs = inputs.into_iter();
2739+
let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2740+
for input in inputs {
2741+
merged.retain(|k, v| input.get(k) == Some(v));
2742+
}
2743+
merged
2744+
}
2745+
26482746
// Manual implementation needed because of `schema` field. Comparison excludes this field.
26492747
impl PartialOrd for Union {
26502748
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {

datafusion/sqllogictest/test_files/union.slt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,3 +836,18 @@ physical_plan
836836
# Clean up after the test
837837
statement ok
838838
drop table aggregate_test_100;
839+
840+
# test for https://github.com/apache/datafusion/issues/14352
841+
query TB rowsort
842+
SELECT
843+
a,
844+
a IS NOT NULL
845+
FROM (
846+
-- second column, even though it's not selected, was necessary to reproduce the bug linked above
847+
SELECT 'foo' AS a, 3 AS b
848+
UNION ALL
849+
SELECT NULL AS a, 4 AS b
850+
)
851+
----
852+
NULL false
853+
foo true

0 commit comments

Comments
 (0)