Skip to content

Commit f7fd4e1

Browse files
alambfindepi
authored andcommitted
Minor: Simplify + document EliminateCrossJoin better (apache#10427)
1 parent 5f88c0c commit f7fd4e1

File tree

4 files changed

+41
-30
lines changed

4 files changed

+41
-30
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder {
10851085
find_valid_equijoin_key_pair(
10861086
&normalized_left_key,
10871087
&normalized_right_key,
1088-
self.plan.schema().clone(),
1089-
right.schema().clone(),
1088+
self.plan.schema(),
1089+
right.schema(),
10901090
)?.ok_or_else(||
10911091
plan_datafusion_err!(
10921092
"can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"

datafusion/expr/src/utils.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool {
885885
/// Check whether all columns are from the schema.
886886
pub fn check_all_columns_from_schema(
887887
columns: &HashSet<Column>,
888-
schema: DFSchemaRef,
888+
schema: &DFSchema,
889889
) -> Result<bool> {
890890
for col in columns.iter() {
891891
let exist = schema.is_column_from_schema(col);
@@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema(
909909
pub fn find_valid_equijoin_key_pair(
910910
left_key: &Expr,
911911
right_key: &Expr,
912-
left_schema: DFSchemaRef,
913-
right_schema: DFSchemaRef,
912+
left_schema: &DFSchema,
913+
right_schema: &DFSchema,
914914
) -> Result<Option<(Expr, Expr)>> {
915915
let left_using_columns = left_key.to_columns()?;
916916
let right_using_columns = right_key.to_columns()?;
@@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair(
920920
return Ok(None);
921921
}
922922

923-
if check_all_columns_from_schema(&left_using_columns, left_schema.clone())?
924-
&& check_all_columns_from_schema(&right_using_columns, right_schema.clone())?
923+
if check_all_columns_from_schema(&left_using_columns, left_schema)?
924+
&& check_all_columns_from_schema(&right_using_columns, right_schema)?
925925
{
926926
return Ok(Some((left_key.clone(), right_key.clone())));
927927
} else if check_all_columns_from_schema(&right_using_columns, left_schema)?

datafusion/optimizer/src/eliminate_cross_join.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin {
107107
left = find_inner_join(
108108
&left,
109109
&mut all_inputs,
110-
&mut possible_join_keys,
110+
&possible_join_keys,
111111
&mut all_join_keys,
112112
)?;
113113
}
@@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin {
144144
}
145145
}
146146

147-
/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins).
147+
/// Recursively accumulate possible_join_keys and inputs from inner joins
148+
/// (including cross joins).
149+
///
148150
/// Returns a boolean indicating whether the flattening was successful.
149151
fn try_flatten_join_inputs(
150152
plan: &LogicalPlan,
@@ -159,22 +161,19 @@ fn try_flatten_join_inputs(
159161
return Ok(false);
160162
}
161163
possible_join_keys.extend(join.on.clone());
162-
let left = &*(join.left);
163-
let right = &*(join.right);
164-
vec![left, right]
164+
vec![&join.left, &join.right]
165165
}
166166
LogicalPlan::CrossJoin(join) => {
167-
let left = &*(join.left);
168-
let right = &*(join.right);
169-
vec![left, right]
167+
vec![&join.left, &join.right]
170168
}
171169
_ => {
172170
return plan_err!("flatten_join_inputs just can call join/cross_join");
173171
}
174172
};
175173

176174
for child in children.iter() {
177-
match *child {
175+
let child = child.as_ref();
176+
match child {
178177
LogicalPlan::Join(Join {
179178
join_type: JoinType::Inner,
180179
..
@@ -184,27 +183,39 @@ fn try_flatten_join_inputs(
184183
return Ok(false);
185184
}
186185
}
187-
_ => all_inputs.push((*child).clone()),
186+
_ => all_inputs.push(child.clone()),
188187
}
189188
}
190189
Ok(true)
191190
}
192191

192+
/// Finds the next to join with the left input plan,
193+
///
194+
/// Finds the next `right` from `rights` that can be joined with `left_input`
195+
/// plan based on the join keys in `possible_join_keys`.
196+
///
197+
/// If such a matching `right` is found:
198+
/// 1. Adds the matching join keys to `all_join_keys`.
199+
/// 2. Returns `left_input JOIN right ON (all join keys)`.
200+
///
201+
/// If no matching `right` is found:
202+
/// 1. Removes the first plan from `rights`
203+
/// 2. Returns `left_input CROSS JOIN right`.
193204
fn find_inner_join(
194205
left_input: &LogicalPlan,
195206
rights: &mut Vec<LogicalPlan>,
196-
possible_join_keys: &mut Vec<(Expr, Expr)>,
207+
possible_join_keys: &[(Expr, Expr)],
197208
all_join_keys: &mut HashSet<(Expr, Expr)>,
198209
) -> Result<LogicalPlan> {
199210
for (i, right_input) in rights.iter().enumerate() {
200211
let mut join_keys = vec![];
201212

202-
for (l, r) in &mut *possible_join_keys {
213+
for (l, r) in possible_join_keys.iter() {
203214
let key_pair = find_valid_equijoin_key_pair(
204215
l,
205216
r,
206-
left_input.schema().clone(),
207-
right_input.schema().clone(),
217+
left_input.schema(),
218+
right_input.schema(),
208219
)?;
209220

210221
// Save join keys
@@ -215,6 +226,7 @@ fn find_inner_join(
215226
}
216227
}
217228

229+
// Found one or more matching join keys
218230
if !join_keys.is_empty() {
219231
all_join_keys.extend(join_keys.clone());
220232
let right_input = rights.remove(i);
@@ -236,6 +248,9 @@ fn find_inner_join(
236248
}));
237249
}
238250
}
251+
252+
// no matching right plan had any join keys, cross join with the first right
253+
// plan
239254
let right = rights.remove(0);
240255
let join_schema = Arc::new(build_join_schema(
241256
left_input.schema(),

datafusion/optimizer/src/extract_equijoin_predicate.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema};
2424
use datafusion_expr::utils::split_conjunction_owned;
2525
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
2626
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
27-
use std::sync::Arc;
2827
// equijoin predicate
2928
type EquijoinPredicate = (Expr, Expr);
3029

@@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
122121

123122
fn split_eq_and_noneq_join_predicate(
124123
filter: Expr,
125-
left_schema: &Arc<DFSchema>,
126-
right_schema: &Arc<DFSchema>,
124+
left_schema: &DFSchema,
125+
right_schema: &DFSchema,
127126
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
128127
let exprs = split_conjunction_owned(filter);
129128

@@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate(
136135
op: Operator::Eq,
137136
ref right,
138137
}) => {
139-
let join_key_pair = find_valid_equijoin_key_pair(
140-
left,
141-
right,
142-
left_schema.clone(),
143-
right_schema.clone(),
144-
)?;
138+
let join_key_pair =
139+
find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?;
145140

146141
if let Some((left_expr, right_expr)) = join_key_pair {
147142
let left_expr_type = left_expr.get_type(left_schema)?;
@@ -172,6 +167,7 @@ mod tests {
172167
use datafusion_expr::{
173168
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
174169
};
170+
use std::sync::Arc;
175171

176172
fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
177173
assert_optimized_plan_eq_display_indent(

0 commit comments

Comments
 (0)