Skip to content

Commit 38ccb00

Browse files
authored
Add swap_inputs to SMJ (#13984)
1 parent 846adf3 commit 38ccb00

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,9 @@ fn hash_join_swap_subrule(
552552
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
553553
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
554554
/// we swap join sides. Therefore, we do not consider them here.
555-
fn swap_join_according_to_unboundedness(
555+
/// This function is crate public as it is useful for downstream projects
556+
/// to implement, or experiment with, their own join selection rules.
557+
pub(crate) fn swap_join_according_to_unboundedness(
556558
hash_join: &HashJoinExec,
557559
) -> Result<Arc<dyn ExecutionPlan>> {
558560
let partition_mode = hash_join.partition_mode();

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ use crate::execution_plan::{boundedness_from_children, EmissionType};
5858
use crate::expressions::PhysicalSortExpr;
5959
use crate::joins::utils::{
6060
build_join_schema, check_join_is_valid, estimate_join_statistics,
61-
symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef,
61+
reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, JoinOn,
62+
JoinOnRef,
6263
};
6364
use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
6465
use crate::spill::spill_record_batches;
@@ -73,7 +74,7 @@ use futures::{Stream, StreamExt};
7374
/// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge
7475
/// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large
7576
/// inputs where one or both of the inputs don't fit in the available memory.
76-
///
77+
///
7778
/// # Join Expressions
7879
///
7980
/// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented by [`Self::on`].
@@ -311,6 +312,37 @@ impl SortMergeJoinExec {
311312
boundedness_from_children([left, right]),
312313
)
313314
}
315+
316+
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
317+
let left = self.left();
318+
let right = self.right();
319+
let new_join = SortMergeJoinExec::try_new(
320+
Arc::clone(right),
321+
Arc::clone(left),
322+
self.on()
323+
.iter()
324+
.map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
325+
.collect::<Vec<_>>(),
326+
self.filter().as_ref().map(JoinFilter::swap),
327+
self.join_type().swap(),
328+
self.sort_options.clone(),
329+
self.null_equals_null,
330+
)?;
331+
332+
// TODO: OR this condition with having a built-in projection (like
333+
// ordinary hash join) when we support it.
334+
if matches!(
335+
self.join_type(),
336+
JoinType::LeftSemi
337+
| JoinType::RightSemi
338+
| JoinType::LeftAnti
339+
| JoinType::RightAnti
340+
) {
341+
Ok(Arc::new(new_join))
342+
} else {
343+
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
344+
}
345+
}
314346
}
315347

316348
impl DisplayAs for SortMergeJoinExec {

0 commit comments

Comments
 (0)