Skip to content

Remove some Expr clones in EliminateCrossJoin(3%-5% faster planning) #10430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 50 additions & 73 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
// under the License.

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use std::collections::HashSet;
use std::sync::Arc;

use crate::{utils, OptimizerConfig, OptimizerRule};

use crate::join_key_set::JoinKeySet;
use datafusion_common::{plan_err, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -55,7 +55,7 @@ impl OptimizerRule for EliminateCrossJoin {
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
let mut possible_join_keys = JoinKeySet::new();
let mut all_inputs: Vec<LogicalPlan> = vec![];
let parent_predicate = match plan {
LogicalPlan::Filter(filter) => {
Expand All @@ -76,7 +76,7 @@ impl OptimizerRule for EliminateCrossJoin {
extract_possible_join_keys(
&filter.predicate,
&mut possible_join_keys,
)?;
);
Some(&filter.predicate)
}
_ => {
Expand All @@ -101,7 +101,7 @@ impl OptimizerRule for EliminateCrossJoin {
};

// Join keys are handled locally:
let mut all_join_keys = HashSet::<(Expr, Expr)>::new();
let mut all_join_keys = JoinKeySet::new();
let mut left = all_inputs.remove(0);
while !all_inputs.is_empty() {
left = find_inner_join(
Expand Down Expand Up @@ -131,7 +131,7 @@ impl OptimizerRule for EliminateCrossJoin {
.map(|f| Some(LogicalPlan::Filter(f)))
} else {
// Remove join expressions from filter:
match remove_join_expressions(predicate, &all_join_keys)? {
match remove_join_expressions(predicate.clone(), &all_join_keys) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone used to happen inside remove_join_expressions so I moved it up so it is clearer (and sets the stage to avoid the clone entirely in the next PR)

Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left))
.map(|f| Some(LogicalPlan::Filter(f))),
_ => Ok(Some(left)),
Expand All @@ -150,7 +150,7 @@ impl OptimizerRule for EliminateCrossJoin {
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
possible_join_keys: &mut Vec<(Expr, Expr)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change in this PR -- instead of passing around &HashSet or &Vec(), they are encapsulated into a struct now which is much more careful to clone only when needed

possible_join_keys: &mut JoinKeySet,
all_inputs: &mut Vec<LogicalPlan>,
) -> Result<bool> {
let children = match plan {
Expand All @@ -160,7 +160,7 @@ fn try_flatten_join_inputs(
// issue: https://github.com/apache/datafusion/issues/4844
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
possible_join_keys.insert_all(join.on.iter());
vec![&join.left, &join.right]
}
LogicalPlan::CrossJoin(join) => {
Expand Down Expand Up @@ -204,8 +204,8 @@ fn try_flatten_join_inputs(
fn find_inner_join(
left_input: &LogicalPlan,
rights: &mut Vec<LogicalPlan>,
possible_join_keys: &[(Expr, Expr)],
all_join_keys: &mut HashSet<(Expr, Expr)>,
possible_join_keys: &JoinKeySet,
all_join_keys: &mut JoinKeySet,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is nice unification

) -> Result<LogicalPlan> {
for (i, right_input) in rights.iter().enumerate() {
let mut join_keys = vec![];
Expand All @@ -228,7 +228,7 @@ fn find_inner_join(

// Found one or more matching join keys
if !join_keys.is_empty() {
all_join_keys.extend(join_keys.clone());
all_join_keys.insert_all(join_keys.iter());
let right_input = rights.remove(i);
let join_schema = Arc::new(build_join_schema(
left_input.schema(),
Expand Down Expand Up @@ -265,90 +265,67 @@ fn find_inner_join(
}))
}

fn intersect(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved into JoinExprSet

accum: &mut Vec<(Expr, Expr)>,
vec1: &[(Expr, Expr)],
vec2: &[(Expr, Expr)],
) {
if !(vec1.is_empty() || vec2.is_empty()) {
for x1 in vec1.iter() {
for x2 in vec2.iter() {
if x1.0 == x2.0 && x1.1 == x2.1 || x1.1 == x2.0 && x1.0 == x2.1 {
accum.push((x1.0.clone(), x1.1.clone()));
}
}
}
}
}

/// Extract join keys from a WHERE clause
fn extract_possible_join_keys(expr: &Expr, accum: &mut Vec<(Expr, Expr)>) -> Result<()> {
fn extract_possible_join_keys(expr: &Expr, join_keys: &mut JoinKeySet) {
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
match op {
Operator::Eq => {
// Ensure that we don't add the same Join keys multiple times
if !(accum.contains(&(*left.clone(), *right.clone()))
|| accum.contains(&(*right.clone(), *left.clone())))
{
accum.push((*left.clone(), *right.clone()));
}
// insert handles ensuring we don't add the same Join keys multiple times
join_keys.insert(left, right);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JoinExprSet does not require cloning to check if the exprs should be inserted

}
Operator::And => {
extract_possible_join_keys(left, accum)?;
extract_possible_join_keys(right, accum)?
extract_possible_join_keys(left, join_keys);
extract_possible_join_keys(right, join_keys)
}
// Fix for issue#78 join predicates from inside of OR expr also pulled up properly.
Operator::Or => {
let mut left_join_keys = vec![];
let mut right_join_keys = vec![];
let mut left_join_keys = JoinKeySet::new();
let mut right_join_keys = JoinKeySet::new();

extract_possible_join_keys(left, &mut left_join_keys)?;
extract_possible_join_keys(right, &mut right_join_keys)?;
extract_possible_join_keys(left, &mut left_join_keys);
extract_possible_join_keys(right, &mut right_join_keys);

intersect(accum, &left_join_keys, &right_join_keys)
join_keys.insert_intersection(left_join_keys, right_join_keys)
}
_ => (),
};
}
Ok(())
}

/// Remove join expressions from a filter expression
/// Returns Some() when there are few remaining predicates in filter_expr
/// Returns None otherwise
fn remove_join_expressions(
expr: &Expr,
join_keys: &HashSet<(Expr, Expr)>,
) -> Result<Option<Expr>> {
///
/// # Returns
/// * `Some()` when there are few remaining predicates in filter_expr
/// * `None` otherwise
fn remove_join_expressions(expr: Expr, join_keys: &JoinKeySet) -> Option<Expr> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API previously cloned on all paths (including to check join keys) -- the new API does not

Also it never returns Err so I changed the signature to Option from Result<Option<..>>

match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
match op {
Operator::Eq => {
if join_keys.contains(&(*left.clone(), *right.clone()))
|| join_keys.contains(&(*right.clone(), *left.clone()))
Copy link
Contributor Author

@alamb alamb May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there were several clones (deep copies) here too to simply check if the the join keys contained the expressions which have been removed.

{
Ok(None)
} else {
Ok(Some(expr.clone()))
}
}
// Fix for issue#78 join predicates from inside of OR expr also pulled up properly.
Operator::And | Operator::Or => {
let l = remove_join_expressions(left, join_keys)?;
let r = remove_join_expressions(right, join_keys)?;
match (l, r) {
(Some(ll), Some(rr)) => Ok(Some(Expr::BinaryExpr(
BinaryExpr::new(Box::new(ll), *op, Box::new(rr)),
))),
(Some(ll), _) => Ok(Some(ll)),
(_, Some(rr)) => Ok(Some(rr)),
_ => Ok(None),
}
}
_ => Ok(Some(expr.clone())),
Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::Eq,
right,
}) if join_keys.contains(&left, &right) => {
// was a join key, so remove it
None
}
// Fix for issue#78 join predicates from inside of OR expr also pulled up properly.
Expr::BinaryExpr(BinaryExpr { left, op, right })
if matches!(op, Operator::And | Operator::Or) =>
{
let l = remove_join_expressions(*left, join_keys);
let r = remove_join_expressions(*right, join_keys);
match (l, r) {
(Some(ll), Some(rr)) => Some(Expr::BinaryExpr(BinaryExpr::new(
Box::new(ll),
op,
Box::new(rr),
))),
(Some(ll), _) => Some(ll),
(_, Some(rr)) => Some(rr),
_ => None,
}
}
_ => Ok(Some(expr.clone())),

_ => Some(expr),
}
}

Expand Down
Loading
Loading