diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs index 1cd68aaaef95..6a94250741a8 100644 --- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs +++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs @@ -196,7 +196,9 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder { #[cfg(test)] mod tests { use crate::{ - physical_plan::{hash_join::PartitionMode, Statistics}, + physical_plan::{ + displayable, hash_join::PartitionMode, ColumnStatistics, Statistics, + }, test::exec::StatisticsExec, }; @@ -204,6 +206,7 @@ mod tests { use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; fn create_big_and_small() -> (Arc, Arc) { let big = Arc::new(StatisticsExec::new( @@ -226,6 +229,75 @@ mod tests { (big, small) } + /// Create a column statistics vector for a single column + /// that has the given min/max/distinct_count properties. + /// + /// Given min/max will be mapped to a [`ScalarValue`] if + /// they are not `None`. + fn create_column_stats( + min: Option, + max: Option, + distinct_count: Option, + ) -> Option> { + Some(vec![ColumnStatistics { + distinct_count, + min_value: min.map(|size| ScalarValue::UInt64(Some(size))), + max_value: max.map(|size| ScalarValue::UInt64(Some(size))), + ..Default::default() + }]) + } + + /// Returns three plans with statistics of (min, max, distinct_count) + /// * big 100K rows @ (0, 50k, 50k) + /// * medium 10K rows @ (1k, 5k, 1k) + /// * small 1K rows @ (0, 100k, 1k) + fn create_nested_with_min_max() -> ( + Arc, + Arc, + Arc, + ) { + let big = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Some(100_000), + column_statistics: create_column_stats( + Some(0), + Some(50_000), + Some(50_000), + ), + ..Default::default() + }, + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), + )); + + let medium = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Some(10_000), + column_statistics: create_column_stats( + Some(1000), + Some(5000), + Some(1000), + ), + ..Default::default() + }, + Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]), + )); + + let small = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Some(1000), + column_statistics: create_column_stats( + Some(0), + Some(100_000), + Some(1000), + ), + ..Default::default() + }, + Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), + )); + + (big, medium, small) + } + #[tokio::test] async fn test_join_with_swap() { let (big, small) = create_big_and_small(); @@ -274,6 +346,82 @@ mod tests { ); } + /// Compare the input plan with the plan after running the probe order optimizer. + macro_rules! assert_optimized { + ($EXPECTED_LINES: expr, $PLAN: expr) => { + let expected_lines = + $EXPECTED_LINES.iter().map(|s| *s).collect::>(); + + let optimized = HashBuildProbeOrder::new() + .optimize(Arc::new($PLAN), &SessionConfig::new()) + .unwrap(); + + let plan = displayable(optimized.as_ref()).indent().to_string(); + let actual_lines = plan.split("\n").collect::>(); + + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + }; + } + + #[tokio::test] + async fn test_nested_join_swap() { + let (big, medium, small) = create_nested_with_min_max(); + + // Form the inner join: big JOIN small + let child_join = HashJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + vec![( + Column::new_with_schema("big_col", &big.schema()).unwrap(), + Column::new_with_schema("small_col", &small.schema()).unwrap(), + )], + None, + &JoinType::Inner, + PartitionMode::CollectLeft, + &false, + ) + .unwrap(); + let child_schema = child_join.schema(); + + // Form join tree `medium LEFT JOIN (big JOIN small)` + let join = HashJoinExec::try_new( + Arc::clone(&medium), + Arc::new(child_join), + vec![( + Column::new_with_schema("medium_col", &medium.schema()).unwrap(), + Column::new_with_schema("small_col", &child_schema).unwrap(), + )], + None, + &JoinType::Left, + PartitionMode::CollectLeft, + &false, + ) + .unwrap(); + + // Hash join uses the left side to build the hash table, and right side to probe it. We want + // to keep left as small as possible, so if we can estimate (with a reasonable margin of error) + // that the left side is smaller than the right side, we should swap the sides. + // + // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's + // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which + // has an exact cardinality of 10_000 rows). + let expected = [ + "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]", + " HashJoinExec: mode=CollectLeft, join_type=Right, on=[(Column { name: \"small_col\", index: 1 }, Column { name: \"medium_col\", index: 0 })]", + " ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"small_col\", index: 0 }, Column { name: \"big_col\", index: 0 })]", + " StatisticsExec: col_count=1, row_count=Some(1000)", + " StatisticsExec: col_count=1, row_count=Some(100000)", + " StatisticsExec: col_count=1, row_count=Some(10000)", + "" + ]; + assert_optimized!(expected, join); + } + #[tokio::test] async fn test_join_no_swap() { let (big, small) = create_big_and_small(); diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index a22bcbc13c7e..c0b8eaea34aa 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -59,7 +59,8 @@ use super::{ coalesce_partitions::CoalescePartitionsExec, expressions::PhysicalSortExpr, join_utils::{ - build_join_schema, check_join_is_valid, ColumnIndex, JoinFilter, JoinOn, JoinSide, + build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex, + JoinFilter, JoinOn, JoinSide, }, }; use super::{ @@ -385,7 +386,12 @@ impl ExecutionPlan for HashJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - Statistics::default() + estimate_join_statistics( + self.left.clone(), + self.right.clone(), + self.on.clone(), + &self.join_type, + ) } } diff --git a/datafusion/core/src/physical_plan/join_utils.rs b/datafusion/core/src/physical_plan/join_utils.rs index cbabf548c12b..efd178d97204 100644 --- a/datafusion/core/src/physical_plan/join_utils.rs +++ b/datafusion/core/src/physical_plan/join_utils.rs @@ -26,11 +26,14 @@ use datafusion_physical_expr::PhysicalExpr; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use parking_lot::Mutex; +use std::cmp::max; use std::collections::HashSet; use std::future::Future; use std::sync::Arc; use std::task::{Context, Poll}; +use super::{ColumnStatistics, ExecutionPlan, Statistics}; + /// The on clause of the join, as vector of (left, right) columns. pub type JoinOn = Vec<(Column, Column)>; /// Reference for JoinOn. @@ -296,6 +299,154 @@ impl Clone for OnceFut { } } +/// A shared state between statistic aggregators for a join +/// operation. +#[derive(Clone, Debug, Default)] +struct PartialJoinStatistics { + pub num_rows: usize, + pub column_statistics: Vec, +} + +/// Estimate the statistics for the given join's output. +pub(crate) fn estimate_join_statistics( + left: Arc, + right: Arc, + on: JoinOn, + join_type: &JoinType, +) -> Statistics { + let left_stats = left.statistics(); + let right_stats = right.statistics(); + + let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on); + let (num_rows, column_statistics) = match join_stats { + Some(stats) => (Some(stats.num_rows), Some(stats.column_statistics)), + None => (None, None), + }; + Statistics { + num_rows, + total_byte_size: None, + column_statistics, + is_exact: false, + } +} + +// Estimate the cardinality for the given join with input statistics. +fn estimate_join_cardinality( + join_type: &JoinType, + left_stats: Statistics, + right_stats: Statistics, + on: &JoinOn, +) -> Option { + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + let left_num_rows = left_stats.num_rows?; + let right_num_rows = right_stats.num_rows?; + + // Take the left_col_stats and right_col_stats using the index + // obtained from index() method of the each element of 'on'. + let all_left_col_stats = left_stats.column_statistics?; + let all_right_col_stats = right_stats.column_statistics?; + let (left_col_stats, right_col_stats) = on + .iter() + .map(|(left, right)| { + ( + all_left_col_stats[left.index()].clone(), + all_right_col_stats[right.index()].clone(), + ) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + let ij_cardinality = estimate_inner_join_cardinality( + left_num_rows, + right_num_rows, + left_col_stats, + right_col_stats, + )?; + + // The cardinality for inner join can also be used to estimate + // the cardinality of left/right/full outer joins as long as it + // it is greater than the minimum cardinality constraints of these + // joins (so that we don't underestimate the cardinality). + let cardinality = match join_type { + JoinType::Inner => ij_cardinality, + JoinType::Left => max(ij_cardinality, left_num_rows), + JoinType::Right => max(ij_cardinality, right_num_rows), + JoinType::Full => { + max(ij_cardinality, left_num_rows) + + max(ij_cardinality, right_num_rows) + - ij_cardinality + } + _ => unreachable!(), + }; + + Some(PartialJoinStatistics { + num_rows: cardinality, + // We don't do anything specific here, just combine the existing + // statistics which might yield subpar results (although it is + // true, esp regarding min/max). For a better estimation, we need + // filter selectivity analysis first. + column_statistics: all_left_col_stats + .into_iter() + .chain(all_right_col_stats.into_iter()) + .collect(), + }) + } + + JoinType::Semi => None, + JoinType::Anti => None, + } +} + +/// Estimate the inner join cardinality by using the basic building blocks of +/// column-level statistics and the total row count. This is a very naive and +/// a very conservative implementation that can quickly give up if there is not +/// enough input statistics. +fn estimate_inner_join_cardinality( + left_num_rows: usize, + right_num_rows: usize, + left_col_stats: Vec, + right_col_stats: Vec, +) -> Option { + // The algorithm here is partly based on the non-histogram selectivity estimation + // from Spark's Catalyst optimizer. + + let mut join_selectivity = None; + for (left_stat, right_stat) in left_col_stats.iter().zip(right_col_stats.iter()) { + if (left_stat.min_value.clone()? > right_stat.max_value.clone()?) + || (left_stat.max_value.clone()? < right_stat.min_value.clone()?) + { + // If there is no overlap, then we can not accurately estimate + // the join cardinality. We could in theory use this information + // to point out the join will not produce any rows, but that would + // require some extra information (namely whether the statistics are + // exact). For now, we just give up. + return None; + } + + let max_distinct = max(left_stat.distinct_count, right_stat.distinct_count); + if max_distinct > join_selectivity { + // Seems like there are a few implementations of this algorithm that implement + // exponential decay for the selectivity (like Hive's Optiq Optimizer). Needs + // further exploration. + join_selectivity = max_distinct; + } + } + + // With the assumption that the smaller input's domain is generally represented in the bigger + // input's domain, we can estimate the inner join's cardinality by taking the cartesian product + // of the two inputs and normalizing it by the selectivity factor. + match join_selectivity { + Some(selectivity) if selectivity > 0 => { + Some((left_num_rows * right_num_rows) / selectivity) + } + // Since we don't have any information about the selectivity (which is derived + // from the number of distinct rows information) we can give up here for now. + // And let other passes handle this (otherwise we would need to produce an + // overestimation using just the cartesian product). + _ => None, + } +} + enum OnceFutState { Pending(OnceFutPending), Ready(Arc>), @@ -347,6 +498,7 @@ impl OnceFut { mod tests { use super::*; use arrow::datatypes::DataType; + use datafusion_common::ScalarValue; fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> Result<()> { let left = left @@ -461,4 +613,185 @@ mod tests { Ok(()) } + + fn create_stats( + num_rows: Option, + column_stats: Option>, + ) -> Statistics { + Statistics { + num_rows, + column_statistics: column_stats, + ..Default::default() + } + } + + fn create_column_stats( + min: Option, + max: Option, + distinct_count: Option, + ) -> ColumnStatistics { + ColumnStatistics { + distinct_count, + min_value: min.map(|size| ScalarValue::UInt64(Some(size))), + max_value: max.map(|size| ScalarValue::UInt64(Some(size))), + ..Default::default() + } + } + + type PartialStats = (usize, u64, u64, Option); + + // This is mainly for validating the all edge cases of the estimation, but + // more advanced (and real world test cases) are below where we need some control + // over the expected output (since it depends on join type to join type). + #[test] + fn test_inner_join_cardinality_single_column() -> Result<()> { + let cases: Vec<(PartialStats, PartialStats, Option)> = vec![ + // ----------------------------------------------------------------------------- + // | left(rows, min, max, distinct), right(rows, min, max, distinct), expected | + // ----------------------------------------------------------------------------- + + // distinct(left) is None OR distinct(right) is None + // + // len(left) = len(right), len(left) * len(right) + ((10, 0, 10, None), (10, 0, 10, None), None), + // len(left) > len(right) OR len(left) < len(right), len(left) * len(right) + ((10, 0, 10, None), (5, 0, 10, None), None), + ((5, 0, 10, None), (10, 0, 10, None), None), + ((10, 0, 10, None), (5, 0, 10, None), None), + ((5, 0, 10, None), (10, 0, 10, None), None), + // min(left) > max(right) OR min(right) > max(left), None + ((10, 0, 10, None), (10, 11, 20, None), None), + ((10, 11, 20, None), (10, 0, 10, None), None), + ((10, 5, 10, None), (10, 11, 3, None), None), + ((10, 10, 5, None), (10, 3, 7, None), None), + // distinct(left) is not None AND distinct(right) is not None + // + // len(left) = len(right), len(left) * len(right) / max(distinct(left), distinct(right)) + ((10, 0, 10, Some(5)), (10, 0, 10, Some(5)), Some(20)), + ((10, 0, 10, Some(10)), (10, 0, 10, Some(5)), Some(10)), + ((10, 0, 10, Some(5)), (10, 0, 10, Some(10)), Some(10)), + ]; + + for (left_info, right_info, expected_cardinality) in cases { + let left_num_rows = left_info.0; + let left_col_stats = vec![create_column_stats( + Some(left_info.1), + Some(left_info.2), + left_info.3, + )]; + + let right_num_rows = right_info.0; + let right_col_stats = vec![create_column_stats( + Some(right_info.1), + Some(right_info.2), + right_info.3, + )]; + + assert_eq!( + estimate_inner_join_cardinality( + left_num_rows, + right_num_rows, + left_col_stats.clone(), + right_col_stats.clone(), + ), + expected_cardinality + ); + + // We should also be able to use join_cardinality to get the same results + let join_type = JoinType::Inner; + let join_on = vec![(Column::new("a", 0), Column::new("b", 0))]; + let partial_join_stats = estimate_join_cardinality( + &join_type, + create_stats(Some(left_num_rows), Some(left_col_stats.clone())), + create_stats(Some(right_num_rows), Some(right_col_stats.clone())), + &join_on, + ); + + assert_eq!( + partial_join_stats.clone().map(|s| s.num_rows), + expected_cardinality + ); + assert_eq!( + partial_join_stats.map(|s| s.column_statistics), + expected_cardinality.map(|_| [left_col_stats, right_col_stats].concat()) + ); + } + Ok(()) + } + + #[test] + fn test_inner_join_cardinality_multiple_column() -> Result<()> { + let left_col_stats = vec![ + create_column_stats(Some(0), Some(100), Some(100)), + create_column_stats(Some(100), Some(500), Some(150)), + ]; + + let right_col_stats = vec![ + create_column_stats(Some(0), Some(100), Some(50)), + create_column_stats(Some(100), Some(500), Some(200)), + ]; + + // We have statistics about 4 columns, where the highest distinct + // count is 200, so we are going to pick it. + assert_eq!( + estimate_inner_join_cardinality(400, 400, left_col_stats, right_col_stats), + Some((400 * 400) / 200) + ); + Ok(()) + } + + #[test] + fn test_join_cardinality() -> Result<()> { + // Left table (rows=1000) + // a: min=0, max=100, distinct=100 + // b: min=0, max=500, distinct=500 + // x: min=1000, max=10000, distinct=None + // + // Right table (rows=2000) + // c: min=0, max=100, distinct=50 + // d: min=0, max=2000, distinct=2500 (how? some inexact statistics) + // y: min=0, max=100, distinct=None + // + // Join on a=c, b=d (ignore x/y) + let cases = vec![ + (JoinType::Inner, 800), + (JoinType::Left, 1000), + (JoinType::Right, 2000), + (JoinType::Full, 2200), + ]; + + let left_col_stats = vec![ + create_column_stats(Some(0), Some(100), Some(100)), + create_column_stats(Some(0), Some(500), Some(500)), + create_column_stats(Some(1000), Some(10000), None), + ]; + + let right_col_stats = vec![ + create_column_stats(Some(0), Some(100), Some(50)), + create_column_stats(Some(0), Some(2000), Some(2500)), + create_column_stats(Some(0), Some(100), None), + ]; + + for (join_type, expected_num_rows) in cases { + let join_on = vec![ + (Column::new("a", 0), Column::new("c", 0)), + (Column::new("b", 1), Column::new("d", 1)), + ]; + + let partial_join_stats = estimate_join_cardinality( + &join_type, + create_stats(Some(1000), Some(left_col_stats.clone())), + create_stats(Some(2000), Some(right_col_stats.clone())), + &join_on, + ) + .unwrap(); + assert_eq!(partial_join_stats.num_rows, expected_num_rows); + assert_eq!( + partial_join_stats.column_statistics, + [left_col_stats.clone(), right_col_stats.clone()].concat() + ); + } + + Ok(()) + } }