Skip to content

Commit 66ff8d5

Browse files
committed
handle join equality specially
1 parent 853270c commit 66ff8d5

File tree

2 files changed

+58
-13
lines changed

2 files changed

+58
-13
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -498,10 +498,7 @@ impl LogicalPlan {
498498
LogicalPlan::Copy(_) => Ok(self),
499499
LogicalPlan::Values(Values { schema, values }) => {
500500
// todo it isn't clear why the schema is not recomputed here
501-
Ok(LogicalPlan::Values(Values {
502-
schema,
503-
values
504-
}))
501+
Ok(LogicalPlan::Values(Values { schema, values }))
505502
}
506503
LogicalPlan::Filter(Filter { predicate, input }) => {
507504
// todo: should this logic be moved to Filter::try_new?

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,17 @@ fn analyze_internal(
102102
// select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3)
103103
schema.merge(external_schema);
104104

105-
let mut expr_rewrite = TypeCoercionRewriter { schema: &schema };
105+
let mut expr_rewrite = TypeCoercionRewriter::new(&schema);
106106

107107
let name_preserver = NamePreserver::new(&plan);
108-
// apply coercion rewrite all expressions in the plan
108+
// apply coercion rewrite all expressions in the plan indivdually
109109
plan.map_expressions(|expr| {
110110
let original_name = name_preserver.save(&expr)?;
111111
expr.rewrite(&mut expr_rewrite)?
112112
.map_data(|expr| original_name.restore(expr))
113113
})?
114+
// coerce join expressions specially
115+
.map_data(|plan| expr_rewrite.coerce_joins(plan))?
114116
// recompute the schema after the expressions have been rewritten as the types may have changed
115117
.map_data(|plan| plan.recompute_schema())
116118
}
@@ -119,6 +121,56 @@ pub(crate) struct TypeCoercionRewriter<'a> {
119121
pub(crate) schema: &'a DFSchema,
120122
}
121123

124+
impl<'a> TypeCoercionRewriter<'a> {
125+
fn new(schema: &'a DFSchema) -> Self {
126+
Self { schema }
127+
}
128+
129+
/// Coerce join equality expressions
130+
///
131+
/// Joins must be treated specially as their equality expressions are stored
132+
/// as a parallel list of left and right expressions, rather than a single
133+
/// equality expression
134+
///
135+
/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored
136+
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)`
137+
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
138+
let LogicalPlan::Join(mut join) = plan else {
139+
return Ok(plan);
140+
};
141+
142+
join.on = join
143+
.on
144+
.into_iter()
145+
.map(|(lhs, rhs)| {
146+
// coerce the arguments as though they were a single binary equality
147+
// expression
148+
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?;
149+
Ok((lhs, rhs))
150+
})
151+
.collect::<Result<Vec<_>>>()?;
152+
153+
Ok(LogicalPlan::Join(join))
154+
}
155+
156+
fn coerce_binary_op(
157+
&self,
158+
left: Expr,
159+
op: Operator,
160+
right: Expr,
161+
) -> Result<(Expr, Expr)> {
162+
let (left_type, right_type) = get_input_types(
163+
&left.get_type(self.schema)?,
164+
&op,
165+
&right.get_type(self.schema)?,
166+
)?;
167+
Ok((
168+
left.cast_to(&left_type, self.schema)?,
169+
right.cast_to(&right_type, self.schema)?,
170+
))
171+
}
172+
}
173+
122174
impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
123175
type Node = Expr;
124176

@@ -223,15 +275,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
223275
))))
224276
}
225277
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
226-
let (left_type, right_type) = get_input_types(
227-
&left.get_type(self.schema)?,
228-
&op,
229-
&right.get_type(self.schema)?,
230-
)?;
278+
let (left, right) = self.coerce_binary_op(*left, op, *right)?;
231279
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
232-
Box::new(left.cast_to(&left_type, self.schema)?),
280+
Box::new(left),
233281
op,
234-
Box::new(right.cast_to(&right_type, self.schema)?),
282+
Box::new(right),
235283
))))
236284
}
237285
Expr::Between(Between {

0 commit comments

Comments
 (0)