diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index d8e310a0dd21..84354c8c0e9a 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1593,7 +1593,6 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-expr-common", - "datafusion-functions-aggregate", "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index 95da1767a3ef..f5ecd41ab11e 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -15,31 +15,30 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::config::ConfigOptions; -use datafusion_execution::TaskContext; -use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics; -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::aggregates::AggregateExec; -use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::ExecutionPlan; use std::sync::Arc; -use datafusion_common::Result; - -use datafusion_physical_plan::aggregates::PhysicalGroupBy; -use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_physical_plan::common; -use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::memory::MemoryExec; +use crate::physical_optimizer::test_utils::TestAggregate; use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_int64_array; +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{self, cast}; -use datafusion_physical_optimizer::test_utils::TestAggregate; +use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::aggregates::AggregateMode; +use datafusion_physical_plan::aggregates::PhysicalGroupBy; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::common; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::memory::MemoryExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::ExecutionPlan; /// Mock data using a MemoryExec which has an exact count statistic fn mock_data() -> Result> { diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 85efebf2386a..74388618015b 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -22,11 +22,10 @@ use std::sync::Arc; -use crate::physical_optimizer::parquet_exec; +use crate::physical_optimizer::test_utils::{parquet_exec, trim_plan_display}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; -use datafusion::physical_optimizer::test_utils::trim_plan_display; use datafusion_common::config::ConfigOptions; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index ea75e7d0e5f4..856f7dc8e8a9 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -19,7 +19,11 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -use crate::physical_optimizer::parquet_exec_with_sort; +use crate::physical_optimizer::test_utils::{ + check_integrity, coalesce_partitions_exec, repartition_exec, schema, + sort_merge_join_exec, sort_preserving_merge_exec, +}; +use crate::physical_optimizer::test_utils::{parquet_exec_with_sort, trim_plan_display}; use arrow::compute::SortOptions; use datafusion::config::ConfigOptions; @@ -40,11 +44,6 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; -use datafusion_physical_optimizer::test_utils::trim_plan_display; -use datafusion_physical_optimizer::test_utils::{ - check_integrity, coalesce_partitions_exec, repartition_exec, schema, - sort_merge_join_exec, sort_preserving_merge_exec, -}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -293,7 +292,7 @@ fn hash_join_exec( join_on: &JoinOn, join_type: &JoinType, ) -> Arc { - datafusion_physical_optimizer::test_utils::hash_join_exec( + crate::physical_optimizer::test_utils::hash_join_exec( left, right, join_on.clone(), diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 4ca8381f2672..66bd1c37d3a0 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,7 +17,14 @@ use std::sync::Arc; -use crate::physical_optimizer::parquet_exec; +use crate::physical_optimizer::test_utils::{ + aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, + coalesce_partitions_exec, create_test_schema, create_test_schema2, + create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, + local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr, + sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, +}; use datafusion_physical_plan::displayable; use arrow::compute::SortOptions; @@ -37,15 +44,14 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TreeNode, TransformedResult}; -use datafusion_physical_optimizer::test_utils::{check_integrity, bounded_window_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec, coalesce_batches_exec, aggregate_exec, RequirementsTestExec}; use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; - use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort::SortExec; + use rstest::rstest; /// Create a csv exec for tests diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs new file mode 100644 index 000000000000..ae7adacadb19 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -0,0 +1,1494 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::{ + any::Any, + pin::Pin, + task::{Context, Poll}, +}; + +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion_common::config::ConfigOptions; +use datafusion_common::JoinSide; +use datafusion_common::{stats::Precision, ColumnStatistics, JoinType, ScalarValue}; +use datafusion_common::{Result, Statistics}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; +use datafusion_physical_expr::intervals::utils::check_support; +use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; +use datafusion_physical_optimizer::join_selection::{ + hash_join_swap_subrule, JoinSelection, +}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::displayable; +use datafusion_physical_plan::joins::utils::ColumnIndex; +use datafusion_physical_plan::joins::utils::JoinFilter; +use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, PartitionMode}; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::ExecutionPlanProperties; +use datafusion_physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; + +use futures::Stream; +use rstest::rstest; + +/// Return statistics for empty table +fn empty_statistics() -> Statistics { + Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::new_unknown()], + } +} + +/// Get table thresholds: (num_rows, byte_size) +fn get_thresholds() -> (usize, usize) { + let optimizer_options = ConfigOptions::new().optimizer; + ( + optimizer_options.hash_join_single_partition_threshold_rows, + optimizer_options.hash_join_single_partition_threshold, + ) +} + +/// Return statistics for small table +fn small_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows / 128), + total_byte_size: Precision::Inexact(threshold_byte_size / 128), + column_statistics: vec![ColumnStatistics::new_unknown()], + } +} + +/// Return statistics for big table +fn big_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows * 2), + total_byte_size: Precision::Inexact(threshold_byte_size * 2), + column_statistics: vec![ColumnStatistics::new_unknown()], + } +} + +/// Return statistics for big table +fn bigger_statistics() -> Statistics { + let (threshold_num_rows, threshold_byte_size) = get_thresholds(); + Statistics { + num_rows: Precision::Inexact(threshold_num_rows * 4), + total_byte_size: Precision::Inexact(threshold_byte_size * 4), + column_statistics: vec![ColumnStatistics::new_unknown()], + } +} + +fn create_big_and_small() -> (Arc, Arc) { + let big = Arc::new(StatisticsExec::new( + big_statistics(), + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), + )); + + let small = Arc::new(StatisticsExec::new( + small_statistics(), + Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), + )); + (big, small) +} + +/// Create a column statistics vector for a single column +/// that has the given min/max/distinct_count properties. +/// +/// Given min/max will be mapped to a [`ScalarValue`] if +/// they are not `None`. +fn create_column_stats( + min: Option, + max: Option, + distinct_count: Option, +) -> Vec { + vec![ColumnStatistics { + distinct_count: distinct_count + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + min_value: min + .map(|size| Precision::Inexact(ScalarValue::UInt64(Some(size)))) + .unwrap_or(Precision::Absent), + max_value: max + .map(|size| Precision::Inexact(ScalarValue::UInt64(Some(size)))) + .unwrap_or(Precision::Absent), + ..Default::default() + }] +} + +/// Create join filter for NLJoinExec with expression `big_col > small_col` +/// where both columns are 0-indexed and come from left and right inputs respectively +fn nl_join_filter() -> Option { + let column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ]; + let intermediate_schema = Schema::new(vec![ + Field::new("big_col", DataType::Int32, false), + Field::new("small_col", DataType::Int32, false), + ]); + let expression = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), + Operator::Gt, + Arc::new(Column::new_with_schema("small_col", &intermediate_schema).unwrap()), + )) as _; + Some(JoinFilter::new( + expression, + column_indices, + Arc::new(intermediate_schema), + )) +} + +/// Returns three plans with statistics of (min, max, distinct_count) +/// * big 100K rows @ (0, 50k, 50k) +/// * medium 10K rows @ (1k, 5k, 1k) +/// * small 1K rows @ (0, 100k, 1k) +fn create_nested_with_min_max() -> ( + Arc, + Arc, + Arc, +) { + let big = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100_000), + column_statistics: create_column_stats(Some(0), Some(50_000), Some(50_000)), + total_byte_size: Precision::Absent, + }, + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), + )); + + let medium = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(10_000), + column_statistics: create_column_stats(Some(1000), Some(5000), Some(1000)), + total_byte_size: Precision::Absent, + }, + Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]), + )); + + let small = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + column_statistics: create_column_stats(Some(0), Some(100_000), Some(1000)), + total_byte_size: Precision::Absent, + }, + Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), + )); + + (big, medium, small) +} + +#[tokio::test] +async fn test_join_with_swap() { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + HashJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + vec![( + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), + Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()), + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + false, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join, &ConfigOptions::new()) + .unwrap(); + + let swapping_projection = optimized_join + .as_any() + .downcast_ref::() + .expect("A proj is required to swap columns back to their original order"); + + assert_eq!(swapping_projection.expr().len(), 2); + let (col, name) = &swapping_projection.expr()[0]; + assert_eq!(name, "big_col"); + assert_col_expr(col, "big_col", 1); + let (col, name) = &swapping_projection.expr()[1]; + assert_eq!(name, "small_col"); + assert_col_expr(col, "small_col", 0); + + let swapped_join = swapping_projection + .input() + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); +} + +#[tokio::test] +async fn test_left_join_no_swap() { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + HashJoinExec::try_new( + Arc::clone(&small), + Arc::clone(&big), + vec![( + Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()), + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + false, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join, &ConfigOptions::new()) + .unwrap(); + + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); +} + +#[tokio::test] +async fn test_join_with_swap_semi() { + let join_types = [JoinType::LeftSemi, JoinType::LeftAnti]; + for join_type in join_types { + let (big, small) = create_big_and_small(); + + let join = HashJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + vec![( + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), + Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()), + )], + None, + &join_type, + None, + PartitionMode::Partitioned, + false, + ) + .unwrap(); + + let original_schema = join.schema(); + + let optimized_join = JoinSelection::new() + .optimize(Arc::new(join), &ConfigOptions::new()) + .unwrap(); + + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect( + "A proj is not required to swap columns back to their original order", + ); + + assert_eq!(swapped_join.schema().fields().len(), 1); + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); + assert_eq!(original_schema, swapped_join.schema()); + } +} + +/// Compare the input plan with the plan after running the probe order optimizer. +macro_rules! assert_optimized { + ($EXPECTED_LINES: expr, $PLAN: expr) => { + let expected_lines = $EXPECTED_LINES.iter().map(|s| *s).collect::>(); + + let plan = Arc::new($PLAN); + let optimized = JoinSelection::new() + .optimize(plan.clone(), &ConfigOptions::new()) + .unwrap(); + + let plan_string = displayable(optimized.as_ref()).indent(true).to_string(); + let actual_lines = plan_string.split("\n").collect::>(); + + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + }; +} + +#[tokio::test] +async fn test_nested_join_swap() { + let (big, medium, small) = create_nested_with_min_max(); + + // Form the inner join: big JOIN small + let child_join = HashJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + vec![( + col("big_col", &big.schema()).unwrap(), + col("small_col", &small.schema()).unwrap(), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + false, + ) + .unwrap(); + let child_schema = child_join.schema(); + + // Form join tree `medium LEFT JOIN (big JOIN small)` + let join = HashJoinExec::try_new( + Arc::clone(&medium), + Arc::new(child_join), + vec![( + col("medium_col", &medium.schema()).unwrap(), + col("small_col", &child_schema).unwrap(), + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + false, + ) + .unwrap(); + + // Hash join uses the left side to build the hash table, and right side to probe it. We want + // to keep left as small as possible, so if we can estimate (with a reasonable margin of error) + // that the left side is smaller than the right side, we should swap the sides. + // + // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's + // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which + // has an exact cardinality of 10_000 rows). + let expected = [ + "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]", + " HashJoinExec: mode=CollectLeft, join_type=Right, on=[(small_col@1, medium_col@0)]", + " ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]", + " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(small_col@0, big_col@0)]", + " StatisticsExec: col_count=1, row_count=Inexact(1000)", + " StatisticsExec: col_count=1, row_count=Inexact(100000)", + " StatisticsExec: col_count=1, row_count=Inexact(10000)", + "", + ]; + assert_optimized!(expected, join); +} + +#[tokio::test] +async fn test_join_no_swap() { + let (big, small) = create_big_and_small(); + let join = Arc::new( + HashJoinExec::try_new( + Arc::clone(&small), + Arc::clone(&big), + vec![( + Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()), + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + false, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join, &ConfigOptions::new()) + .unwrap(); + + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); +} + +#[rstest( + join_type, + case::inner(JoinType::Inner), + case::left(JoinType::Left), + case::right(JoinType::Right), + case::full(JoinType::Full) +)] +#[tokio::test] +async fn test_nl_join_with_swap(join_type: JoinType) { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + NestedLoopJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + nl_join_filter(), + &join_type, + None, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join, &ConfigOptions::new()) + .unwrap(); + + let swapping_projection = optimized_join + .as_any() + .downcast_ref::() + .expect("A proj is required to swap columns back to their original order"); + + assert_eq!(swapping_projection.expr().len(), 2); + let (col, name) = &swapping_projection.expr()[0]; + assert_eq!(name, "big_col"); + assert_col_expr(col, "big_col", 1); + let (col, name) = &swapping_projection.expr()[1]; + assert_eq!(name, "small_col"); + assert_col_expr(col, "small_col", 0); + + let swapped_join = swapping_projection + .input() + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + // Assert join side of big_col swapped in filter expression + let swapped_filter = swapped_join.filter().unwrap(); + let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); + let swapped_big_col_side = swapped_filter + .column_indices() + .get(swapped_big_col_idx) + .unwrap() + .side; + assert_eq!( + swapped_big_col_side, + JoinSide::Right, + "Filter column side should be swapped" + ); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); +} + +#[rstest( + join_type, + case::left_semi(JoinType::LeftSemi), + case::left_anti(JoinType::LeftAnti), + case::right_semi(JoinType::RightSemi), + case::right_anti(JoinType::RightAnti) +)] +#[tokio::test] +async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + NestedLoopJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + nl_join_filter(), + &join_type, + None, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize( + Arc::::clone(&join), + &ConfigOptions::new(), + ) + .unwrap(); + + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + // Assert before/after schemas are equal + assert_eq!( + join.schema(), + swapped_join.schema(), + "Join schema should not be modified while optimization" + ); + + // Assert join side of big_col swapped in filter expression + let swapped_filter = swapped_join.filter().unwrap(); + let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); + let swapped_big_col_side = swapped_filter + .column_indices() + .get(swapped_big_col_idx) + .unwrap() + .side; + assert_eq!( + swapped_big_col_side, + JoinSide::Right, + "Filter column side should be swapped" + ); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); +} + +#[rstest( + join_type, projection, small_on_right, + case::inner(JoinType::Inner, vec![1], true), + case::left(JoinType::Left, vec![1], true), + case::right(JoinType::Right, vec![1], true), + case::full(JoinType::Full, vec![1], true), + case::left_anti(JoinType::LeftAnti, vec![0], false), + case::left_semi(JoinType::LeftSemi, vec![0], false), + case::right_anti(JoinType::RightAnti, vec![0], true), + case::right_semi(JoinType::RightSemi, vec![0], true), + )] +#[tokio::test] +async fn test_hash_join_swap_on_joins_with_projections( + join_type: JoinType, + projection: Vec, + small_on_right: bool, +) -> Result<()> { + let (big, small) = create_big_and_small(); + + let left = if small_on_right { &big } else { &small }; + let right = if small_on_right { &small } else { &big }; + + let left_on = if small_on_right { + "big_col" + } else { + "small_col" + }; + let right_on = if small_on_right { + "small_col" + } else { + "big_col" + }; + + let join = Arc::new(HashJoinExec::try_new( + Arc::clone(left), + Arc::clone(right), + vec![( + Arc::new(Column::new_with_schema(left_on, &left.schema())?), + Arc::new(Column::new_with_schema(right_on, &right.schema())?), + )], + None, + &join_type, + Some(projection), + PartitionMode::Partitioned, + false, + )?); + + let swapped = join + .swap_inputs(PartitionMode::Partitioned) + .expect("swap_hash_join must support joins with projections"); + let swapped_join = swapped.as_any().downcast_ref::().expect( + "ProjectionExec won't be added above if HashJoinExec contains embedded projection", + ); + + assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!(swapped.schema().fields.len(), 1); + assert_eq!(swapped.schema().fields[0].name(), "small_col"); + Ok(()) +} + +fn assert_col_expr(expr: &Arc, name: &str, index: usize) { + let col = expr + .as_any() + .downcast_ref::() + .expect("Projection items should be Column expression"); + assert_eq!(col.name(), name); + assert_eq!(col.index(), index); +} + +#[tokio::test] +async fn test_join_selection_collect_left() { + let big = Arc::new(StatisticsExec::new( + big_statistics(), + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), + )); + + let small = Arc::new(StatisticsExec::new( + small_statistics(), + Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), + )); + + let empty = Arc::new(StatisticsExec::new( + empty_statistics(), + Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), + )); + + let join_on = vec![( + col("small_col", &small.schema()).unwrap(), + col("big_col", &big.schema()).unwrap(), + )]; + check_join_partition_mode( + Arc::::clone(&small), + Arc::::clone(&big), + join_on, + false, + PartitionMode::CollectLeft, + ); + + let join_on = vec![( + col("big_col", &big.schema()).unwrap(), + col("small_col", &small.schema()).unwrap(), + )]; + check_join_partition_mode( + big, + Arc::::clone(&small), + join_on, + true, + PartitionMode::CollectLeft, + ); + + let join_on = vec![( + col("small_col", &small.schema()).unwrap(), + col("empty_col", &empty.schema()).unwrap(), + )]; + check_join_partition_mode( + Arc::::clone(&small), + Arc::::clone(&empty), + join_on, + false, + PartitionMode::CollectLeft, + ); + + let join_on = vec![( + col("empty_col", &empty.schema()).unwrap(), + col("small_col", &small.schema()).unwrap(), + )]; + check_join_partition_mode(empty, small, join_on, true, PartitionMode::CollectLeft); +} + +#[tokio::test] +async fn test_join_selection_partitioned() { + let bigger = Arc::new(StatisticsExec::new( + bigger_statistics(), + Schema::new(vec![Field::new("bigger_col", DataType::Int32, false)]), + )); + + let big = Arc::new(StatisticsExec::new( + big_statistics(), + Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), + )); + + let empty = Arc::new(StatisticsExec::new( + empty_statistics(), + Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), + )); + + let join_on = vec![( + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("bigger_col", &bigger.schema()).unwrap()) as _, + )]; + check_join_partition_mode( + Arc::::clone(&big), + Arc::::clone(&bigger), + join_on, + false, + PartitionMode::Partitioned, + ); + + let join_on = vec![( + Arc::new(Column::new_with_schema("bigger_col", &bigger.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, + )]; + check_join_partition_mode( + bigger, + Arc::::clone(&big), + join_on, + true, + PartitionMode::Partitioned, + ); + + let join_on = vec![( + Arc::new(Column::new_with_schema("empty_col", &empty.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, + )]; + check_join_partition_mode( + Arc::::clone(&empty), + Arc::::clone(&big), + join_on, + false, + PartitionMode::Partitioned, + ); + + let join_on = vec![( + Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("empty_col", &empty.schema()).unwrap()) as _, + )]; + check_join_partition_mode(big, empty, join_on, false, PartitionMode::Partitioned); +} + +fn check_join_partition_mode( + left: Arc, + right: Arc, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + is_swapped: bool, + expected_mode: PartitionMode, +) { + let join = Arc::new( + HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Auto, + false, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join, &ConfigOptions::new()) + .unwrap(); + + if !is_swapped { + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + assert_eq!(*swapped_join.partition_mode(), expected_mode); + } else { + let swapping_projection = optimized_join + .as_any() + .downcast_ref::() + .expect("A proj is required to swap columns back to their original order"); + let swapped_join = swapping_projection + .input() + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + assert_eq!(*swapped_join.partition_mode(), expected_mode); + } +} + +#[derive(Debug)] +struct UnboundedStream { + batch_produce: Option, + count: usize, + batch: RecordBatch, +} + +impl Stream for UnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(val) = self.batch_produce { + if val <= self.count { + return Poll::Ready(None); + } + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } +} + +impl RecordBatchStream for UnboundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } +} + +/// A mock execution plan that simply returns the provided data source characteristic +#[derive(Debug, Clone)] +pub struct UnboundedExec { + batch_produce: Option, + batch: RecordBatch, + cache: PlanProperties, +} + +impl UnboundedExec { + /// Create new exec that clones the given record batch to its output. + /// + /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. + pub fn new( + batch_produce: Option, + batch: RecordBatch, + partitions: usize, + ) -> Self { + let cache = Self::compute_properties(batch.schema(), batch_produce, partitions); + Self { + batch_produce, + batch, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + schema: SchemaRef, + batch_produce: Option, + n_partitions: usize, + ) -> PlanProperties { + let boundedness = if batch_produce.is_none() { + Boundedness::Unbounded { + requires_infinite_memory: false, + } + } else { + Boundedness::Bounded + }; + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(n_partitions), + EmissionType::Incremental, + boundedness, + ) + } +} + +impl DisplayAs for UnboundedExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "UnboundedExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } +} + +impl ExecutionPlan for UnboundedExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(UnboundedStream { + batch_produce: self.batch_produce, + count: 0, + batch: self.batch.clone(), + })) + } +} + +#[derive(Eq, PartialEq, Debug)] +pub enum SourceType { + Unbounded, + Bounded, +} + +/// A mock execution plan that simply returns the provided statistics +#[derive(Debug, Clone)] +pub struct StatisticsExec { + stats: Statistics, + schema: Arc, + cache: PlanProperties, +} + +impl StatisticsExec { + pub fn new(stats: Statistics, schema: Schema) -> Self { + assert_eq!( + stats.column_statistics.len(), schema.fields().len(), + "if defined, the column statistics vector length should be the number of fields" + ); + let cache = Self::compute_properties(Arc::new(schema.clone())); + Self { + stats, + schema: Arc::new(schema), + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(2), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for StatisticsExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "StatisticsExec: col_count={}, row_count={:?}", + self.schema.fields().len(), + self.stats.num_rows, + ) + } + } + } +} + +impl ExecutionPlan for StatisticsExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("This plan only serves for testing statistics") + } + + fn statistics(&self) -> Result { + Ok(self.stats.clone()) + } +} + +#[test] +fn check_expr_supported() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + let supported_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Column::new("a", 0)), + )) as Arc; + assert!(check_support(&supported_expr, &schema)); + let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc; + assert!(check_support(&supported_expr_2, &schema)); + let unsupported_expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Or, + Arc::new(Column::new("a", 0)), + )) as Arc; + assert!(!check_support(&unsupported_expr, &schema)); + let unsupported_expr_2 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Or, + Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))), + )) as Arc; + assert!(!check_support(&unsupported_expr_2, &schema)); +} + +struct TestCase { + case: String, + initial_sources_unbounded: (SourceType, SourceType), + initial_join_type: JoinType, + initial_mode: PartitionMode, + expected_sources_unbounded: (SourceType, SourceType), + expected_join_type: JoinType, + expected_mode: PartitionMode, + expecting_swap: bool, +} + +#[tokio::test] +async fn test_join_with_swap_full() -> Result<()> { + // NOTE: Currently, some initial conditions are not viable after join order selection. + // For example, full join always comes in partitioned mode. See the warning in + // function "swap". If this changes in the future, we should update these tests. + let cases = vec![ + TestCase { + case: "Bounded - Unbounded 1".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: JoinType::Full, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: JoinType::Full, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }, + TestCase { + case: "Unbounded - Bounded 2".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: JoinType::Full, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + expected_join_type: JoinType::Full, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }, + TestCase { + case: "Bounded - Bounded 3".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: JoinType::Full, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: JoinType::Full, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }, + TestCase { + case: "Unbounded - Unbounded 4".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: JoinType::Full, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: JoinType::Full, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }, + ]; + for case in cases.into_iter() { + test_join_with_maybe_swap_unbounded_case(case).await? + } + Ok(()) +} + +#[tokio::test] +async fn test_cases_without_collect_left_check() -> Result<()> { + let mut cases = vec![]; + let join_types = vec![JoinType::LeftSemi, JoinType::Inner]; + for join_type in join_types { + cases.push(TestCase { + case: "Unbounded - Bounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type.swap(), + expected_mode: PartitionMode::CollectLeft, + expecting_swap: true, + }); + cases.push(TestCase { + case: "Bounded - Unbounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Unbounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Bounded - Bounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Bounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type.swap(), + expected_mode: PartitionMode::Partitioned, + expecting_swap: true, + }); + cases.push(TestCase { + case: "Bounded - Unbounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Bounded - Bounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Unbounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + } + + for case in cases.into_iter() { + test_join_with_maybe_swap_unbounded_case(case).await? + } + Ok(()) +} + +#[tokio::test] +async fn test_not_support_collect_left() -> Result<()> { + let mut cases = vec![]; + // After [JoinSelection] optimization, these join types cannot run in CollectLeft mode except + // [JoinType::LeftSemi] + let the_ones_not_support_collect_left = vec![JoinType::Left, JoinType::LeftAnti]; + for join_type in the_ones_not_support_collect_left { + cases.push(TestCase { + case: "Unbounded - Bounded".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type.swap(), + expected_mode: PartitionMode::Partitioned, + expecting_swap: true, + }); + cases.push(TestCase { + case: "Bounded - Unbounded".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Bounded - Bounded".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Unbounded".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + } + + for case in cases.into_iter() { + test_join_with_maybe_swap_unbounded_case(case).await? + } + Ok(()) +} + +#[tokio::test] +async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> { + let mut cases = vec![]; + let the_ones_not_support_collect_left = + vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi]; + for join_type in the_ones_not_support_collect_left { + // We expect that (SourceType::Unbounded, SourceType::Bounded) will change, regardless of the + // statistics. + cases.push(TestCase { + case: "Unbounded - Bounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + // We expect that (SourceType::Bounded, SourceType::Unbounded) will stay same, regardless of the + // statistics. + cases.push(TestCase { + case: "Bounded - Unbounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Unbounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + // + cases.push(TestCase { + case: "Bounded - Bounded / CollectLeft".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::CollectLeft, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::CollectLeft, + expecting_swap: false, + }); + // If cases are partitioned, only unbounded & bounded check will affect the order. + cases.push(TestCase { + case: "Unbounded - Bounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Bounded - Unbounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Bounded - Bounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + cases.push(TestCase { + case: "Unbounded - Unbounded / Partitioned".to_string(), + initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + initial_join_type: join_type, + initial_mode: PartitionMode::Partitioned, + expected_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), + expected_join_type: join_type, + expected_mode: PartitionMode::Partitioned, + expecting_swap: false, + }); + } + + for case in cases.into_iter() { + test_join_with_maybe_swap_unbounded_case(case).await? + } + Ok(()) +} + +async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { + let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded; + let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded; + let left_exec = Arc::new(UnboundedExec::new( + (!left_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Int32, + false, + )]))), + 2, + )) as _; + let right_exec = Arc::new(UnboundedExec::new( + (!right_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "b", + DataType::Int32, + false, + )]))), + 2, + )) as _; + + let join = Arc::new(HashJoinExec::try_new( + Arc::clone(&left_exec), + Arc::clone(&right_exec), + vec![( + col("a", &left_exec.schema())?, + col("b", &right_exec.schema())?, + )], + None, + &t.initial_join_type, + None, + t.initial_mode, + false, + )?) as _; + + let optimized_join_plan = hash_join_swap_subrule(join, &ConfigOptions::new())?; + + // If swap did happen + let projection_added = optimized_join_plan.as_any().is::(); + let plan = if projection_added { + let proj = optimized_join_plan + .as_any() + .downcast_ref::() + .expect("A proj is required to swap columns back to their original order"); + Arc::::clone(proj.input()) + } else { + optimized_join_plan + }; + + if let Some(HashJoinExec { + left, + right, + join_type, + mode, + .. + }) = plan.as_any().downcast_ref::() + { + let left_changed = Arc::ptr_eq(left, &right_exec); + let right_changed = Arc::ptr_eq(right, &left_exec); + // If this is not equal, we have a bigger problem. + assert_eq!(left_changed, right_changed); + assert_eq!( + ( + t.case.as_str(), + if left.boundedness().is_unbounded() { + SourceType::Unbounded + } else { + SourceType::Bounded + }, + if right.boundedness().is_unbounded() { + SourceType::Unbounded + } else { + SourceType::Bounded + }, + join_type, + mode, + left_changed && right_changed + ), + ( + t.case.as_str(), + t.expected_sources_unbounded.0, + t.expected_sources_unbounded.1, + &t.expected_join_type, + &t.expected_mode, + t.expecting_swap + ) + ); + }; + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs new file mode 100644 index 000000000000..49490b2a3d48 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr::expressions::{col, lit}; +use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use datafusion_physical_optimizer::limit_pushdown::LimitPushdown; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::empty::EmptyExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; +use datafusion_physical_plan::{get_plan_string, ExecutionPlan, ExecutionPlanProperties}; + +fn create_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c2", DataType::Int32, true), + Field::new("c3", DataType::Int32, true), + ])) +} + +fn streaming_table_exec(schema: SchemaRef) -> Result> { + Ok(Arc::new(StreamingTableExec::try_new( + Arc::clone(&schema), + vec![Arc::new(DummyStreamPartition { schema }) as _], + None, + None, + true, + None, + )?)) +} + +fn global_limit_exec( + input: Arc, + skip: usize, + fetch: Option, +) -> Arc { + Arc::new(GlobalLimitExec::new(input, skip, fetch)) +} + +fn local_limit_exec( + input: Arc, + fetch: usize, +) -> Arc { + Arc::new(LocalLimitExec::new(input, fetch)) +} + +fn sort_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortExec::new(sort_exprs, input)) +} + +fn sort_preserving_merge_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) +} + +fn projection_exec( + schema: SchemaRef, + input: Arc, +) -> Result> { + Ok(Arc::new(ProjectionExec::try_new( + vec![ + (col("c1", schema.as_ref()).unwrap(), "c1".to_string()), + (col("c2", schema.as_ref()).unwrap(), "c2".to_string()), + (col("c3", schema.as_ref()).unwrap(), "c3".to_string()), + ], + input, + )?)) +} + +fn filter_exec( + schema: SchemaRef, + input: Arc, +) -> Result> { + Ok(Arc::new(FilterExec::try_new( + Arc::new(BinaryExpr::new( + col("c3", schema.as_ref()).unwrap(), + Operator::Gt, + lit(0), + )), + input, + )?)) +} + +fn coalesce_batches_exec(input: Arc) -> Arc { + Arc::new(CoalesceBatchesExec::new(input, 8192)) +} + +fn coalesce_partitions_exec( + local_limit: Arc, +) -> Arc { + Arc::new(CoalescePartitionsExec::new(local_limit)) +} + +fn repartition_exec( + streaming_table: Arc, +) -> Result> { + Ok(Arc::new(RepartitionExec::try_new( + streaming_table, + Partitioning::RoundRobinBatch(8), + )?)) +} + +fn empty_exec(schema: SchemaRef) -> Arc { + Arc::new(EmptyExec::new(schema)) +} + +#[derive(Debug)] +struct DummyStreamPartition { + schema: SchemaRef, +} +impl PartitionStream for DummyStreamPartition { + fn schema(&self) -> &SchemaRef { + &self.schema + } + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + unreachable!() + } +} + +#[test] +fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() -> Result<()> +{ + let schema = create_schema(); + let streaming_table = streaming_table_exec(schema)?; + let global_limit = global_limit_exec(streaming_table, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_limit_when_skip_is_nonzero( +) -> Result<()> { + let schema = create_schema(); + let streaming_table = streaming_table_exec(schema)?; + let global_limit = global_limit_exec(streaming_table, 2, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=2, fetch=5", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "GlobalLimitExec: skip=2, fetch=5", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=7" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit( +) -> Result<()> { + let schema = create_schema(); + let streaming_table = streaming_table_exec(Arc::clone(&schema))?; + let repartition = repartition_exec(streaming_table)?; + let filter = filter_exec(schema, repartition)?; + let coalesce_batches = coalesce_batches_exec(filter); + let local_limit = local_limit_exec(coalesce_batches, 5); + let coalesce_partitions = coalesce_partitions_exec(local_limit); + let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=5", + " CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "GlobalLimitExec: skip=0, fetch=5", + " CoalescePartitionsExec", + " CoalesceBatchesExec: target_batch_size=8192, fetch=5", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { + let schema = create_schema(); + let streaming_table = streaming_table_exec(Arc::clone(&schema))?; + let filter = filter_exec(Arc::clone(&schema), streaming_table)?; + let projection = projection_exec(schema, filter)?; + let global_limit = global_limit_exec(projection, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " FilterExec: c3@2 > 0", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " GlobalLimitExec: skip=0, fetch=5", + " FilterExec: c3@2 > 0", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version( +) -> Result<()> { + let schema = create_schema(); + let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap(); + let coalesce_batches = coalesce_batches_exec(streaming_table); + let projection = projection_exec(schema, coalesce_batches)?; + let global_limit = global_limit_exec(projection, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " CoalesceBatchesExec: target_batch_size=8192", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " CoalesceBatchesExec: target_batch_size=8192, fetch=5", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { + let schema = create_schema(); + let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap(); + let coalesce_batches = coalesce_batches_exec(streaming_table); + let projection = projection_exec(Arc::clone(&schema), coalesce_batches)?; + let repartition = repartition_exec(projection)?; + let sort = sort_exec( + vec![PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }], + repartition, + ); + let spm = sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort); + let global_limit = global_limit_exec(spm, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " SortPreservingMergeExec: [c1@0 ASC]", + " SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " CoalesceBatchesExec: target_batch_size=8192", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "SortPreservingMergeExec: [c1@0 ASC], fetch=5", + " SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", + " CoalesceBatchesExec: target_batch_size=8192", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> Result<()> +{ + let schema = create_schema(); + let streaming_table = streaming_table_exec(Arc::clone(&schema))?; + let repartition = repartition_exec(streaming_table)?; + let filter = filter_exec(schema, repartition)?; + let coalesce_partitions = coalesce_partitions_exec(filter); + let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=0, fetch=5", + " CoalescePartitionsExec", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = [ + "GlobalLimitExec: skip=0, fetch=5", + " CoalescePartitionsExec", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn merges_local_limit_with_local_limit() -> Result<()> { + let schema = create_schema(); + let empty_exec = empty_exec(schema); + let child_local_limit = local_limit_exec(empty_exec, 10); + let parent_local_limit = local_limit_exec(child_local_limit, 20); + + let initial = get_plan_string(&parent_local_limit); + let expected_initial = [ + "LocalLimitExec: fetch=20", + " LocalLimitExec: fetch=10", + " EmptyExec", + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?; + + let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn merges_global_limit_with_global_limit() -> Result<()> { + let schema = create_schema(); + let empty_exec = empty_exec(schema); + let child_global_limit = global_limit_exec(empty_exec, 10, Some(30)); + let parent_global_limit = global_limit_exec(child_global_limit, 10, Some(20)); + + let initial = get_plan_string(&parent_global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=10, fetch=20", + " GlobalLimitExec: skip=10, fetch=30", + " EmptyExec", + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(parent_global_limit, &ConfigOptions::new())?; + + let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn merges_global_limit_with_local_limit() -> Result<()> { + let schema = create_schema(); + let empty_exec = empty_exec(schema); + let local_limit = local_limit_exec(empty_exec, 40); + let global_limit = global_limit_exec(local_limit, 20, Some(30)); + + let initial = get_plan_string(&global_limit); + let expected_initial = [ + "GlobalLimitExec: skip=20, fetch=30", + " LocalLimitExec: fetch=40", + " EmptyExec", + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + + let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn merges_local_limit_with_global_limit() -> Result<()> { + let schema = create_schema(); + let empty_exec = empty_exec(schema); + let global_limit = global_limit_exec(empty_exec, 20, Some(30)); + let local_limit = local_limit_exec(global_limit, 20); + + let initial = get_plan_string(&local_limit); + let expected_initial = [ + "LocalLimitExec: fetch=20", + " GlobalLimitExec: skip=20, fetch=30", + " EmptyExec", + ]; + + assert_eq!(initial, expected_initial); + + let after_optimize = + LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?; + + let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 7c04d1239bc8..565cee47e3b9 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -19,7 +19,10 @@ use std::sync::Arc; -use crate::physical_optimizer::parquet_exec_with_sort; +use crate::physical_optimizer::test_utils::{ + assert_plan_matches_expected, build_group_by, mock_data, parquet_exec_with_sort, + schema, TestAggregate, +}; use arrow::{compute::SortOptions, util::pretty::pretty_format_batches}; use arrow_schema::DataType; @@ -30,9 +33,6 @@ use datafusion_expr::Operator; use datafusion_physical_expr::expressions::cast; use datafusion_physical_expr::{expressions, expressions::col, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::test_utils::{ - assert_plan_matches_expected, build_group_by, mock_data, schema, TestAggregate, -}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode}, collect, diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 19da0ef7855e..1cf8ce6007d0 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -21,36 +21,9 @@ mod aggregate_statistics; mod combine_partial_final_agg; mod enforce_distribution; mod enforce_sorting; +mod join_selection; +mod limit_pushdown; mod limited_distinct_aggregation; mod replace_with_order_preserving_variants; mod sanity_checker; - -use std::sync::Arc; - -use arrow_schema::SchemaRef; -use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; -use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::test_utils::schema; - -/// Create a non sorted parquet exec -pub fn parquet_exec(schema: &SchemaRef) -> Arc { - ParquetExec::builder( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) - .with_file(PartitionedFile::new("x".to_string(), 100)), - ) - .build_arc() -} - -/// Create a single parquet file that is sorted -pub(crate) fn parquet_exec_with_sort( - output_ordering: Vec, -) -> Arc { - ParquetExec::builder( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_output_ordering(output_ordering), - ) - .build_arc() -} +mod test_utils; diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 912683083738..52cd5e5754fa 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -17,6 +17,10 @@ use std::sync::Arc; +use crate::physical_optimizer::test_utils::{ + check_integrity, stream_exec_ordered_with_projection, +}; + use datafusion::prelude::SessionContext; use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; @@ -40,7 +44,6 @@ use datafusion_common::Result; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; -use datafusion_physical_optimizer::test_utils::{check_integrity, stream_exec_ordered_with_projection}; use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext}; use datafusion_common::config::ConfigOptions; diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 7f723ae67e8e..3057ca819e82 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -15,25 +15,27 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + +use crate::physical_optimizer::test_utils::{ + bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, + repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, +}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::prelude::{CsvReadOptions, SessionContext}; -use datafusion_common::{JoinType, Result}; -use std::sync::Arc; - -use async_trait::async_trait; use datafusion_common::config::ConfigOptions; +use datafusion_common::{JoinType, Result}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::Partitioning; use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; -use datafusion_physical_optimizer::test_utils::{ - bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, -}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::{displayable, ExecutionPlan}; +use async_trait::async_trait; + async fn register_current_csv( ctx: &SessionContext, table_name: &str, diff --git a/datafusion/physical-optimizer/src/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs similarity index 94% rename from datafusion/physical-optimizer/src/test_utils.rs rename to datafusion/core/tests/physical_optimizer/test_utils.rs index 56830d3b3f89..6f0a4e46cd9e 100644 --- a/datafusion/physical-optimizer/src/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -17,15 +17,20 @@ //! Test utilities for physical optimizer tests -use crate::limited_distinct_aggregation::LimitedDistinctAggregation; -use crate::PhysicalOptimizerRule; +use std::any::Any; +use std::fmt::Formatter; +use std::sync::Arc; + use arrow::array::Int32Array; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_common::{JoinType, Result}; +use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; use datafusion_functions_aggregate::count::count_udaf; @@ -34,6 +39,8 @@ use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{expressions, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -56,9 +63,27 @@ use datafusion_physical_plan::{ displayable, DisplayAs, DisplayFormatType, PlanProperties, }; use datafusion_physical_plan::{InputOrderMode, Partitioning}; -use std::any::Any; -use std::fmt::Formatter; -use std::sync::Arc; + +/// Create a non sorted parquet exec +pub fn parquet_exec(schema: &SchemaRef) -> Arc { + ParquetExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone()) + .with_file(PartitionedFile::new("x".to_string(), 100)), + ) + .build_arc() +} + +/// Create a single parquet file that is sorted +pub(crate) fn parquet_exec_with_sort( + output_ordering: Vec, +) -> Arc { + ParquetExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), + ) + .build_arc() +} pub fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -383,7 +408,7 @@ pub fn trim_plan_display(plan: &str) -> Vec<&str> { // construct a stream partition for test purposes #[derive(Debug)] -pub(crate) struct TestStreamPartition { +pub struct TestStreamPartition { pub schema: SchemaRef, } diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index a40827bda209..4dc9ac22f173 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -41,7 +41,6 @@ datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } -datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index d5f70938a7d4..03bfb6978890 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -526,7 +526,7 @@ fn hash_join_convert_symmetric_subrule( /// +--------------+ +--------------+ /// /// ``` -fn hash_join_swap_subrule( +pub fn hash_join_swap_subrule( mut input: Arc, _config_options: &ConfigOptions, ) -> Result> { @@ -589,1550 +589,4 @@ fn apply_subrules( Ok(Transformed::yes(input)) } -#[cfg(test)] -mod tests_statistical { - use super::*; - use util_tests::StatisticsExec; - - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{ - stats::Precision, ColumnStatistics, JoinType, ScalarValue, Statistics, - }; - use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::expressions::BinaryExpr; - use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; - use datafusion_physical_plan::displayable; - use datafusion_physical_plan::projection::ProjectionExec; - use rstest::rstest; - - /// Return statistics for empty table - fn empty_statistics() -> Statistics { - Statistics { - num_rows: Precision::Absent, - total_byte_size: Precision::Absent, - column_statistics: vec![ColumnStatistics::new_unknown()], - } - } - - /// Get table thresholds: (num_rows, byte_size) - fn get_thresholds() -> (usize, usize) { - let optimizer_options = ConfigOptions::new().optimizer; - ( - optimizer_options.hash_join_single_partition_threshold_rows, - optimizer_options.hash_join_single_partition_threshold, - ) - } - - /// Return statistics for small table - fn small_statistics() -> Statistics { - let (threshold_num_rows, threshold_byte_size) = get_thresholds(); - Statistics { - num_rows: Precision::Inexact(threshold_num_rows / 128), - total_byte_size: Precision::Inexact(threshold_byte_size / 128), - column_statistics: vec![ColumnStatistics::new_unknown()], - } - } - - /// Return statistics for big table - fn big_statistics() -> Statistics { - let (threshold_num_rows, threshold_byte_size) = get_thresholds(); - Statistics { - num_rows: Precision::Inexact(threshold_num_rows * 2), - total_byte_size: Precision::Inexact(threshold_byte_size * 2), - column_statistics: vec![ColumnStatistics::new_unknown()], - } - } - - /// Return statistics for big table - fn bigger_statistics() -> Statistics { - let (threshold_num_rows, threshold_byte_size) = get_thresholds(); - Statistics { - num_rows: Precision::Inexact(threshold_num_rows * 4), - total_byte_size: Precision::Inexact(threshold_byte_size * 4), - column_statistics: vec![ColumnStatistics::new_unknown()], - } - } - - fn create_big_and_small() -> (Arc, Arc) { - let big = Arc::new(StatisticsExec::new( - big_statistics(), - Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), - )); - - let small = Arc::new(StatisticsExec::new( - small_statistics(), - Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), - )); - (big, small) - } - - /// Create a column statistics vector for a single column - /// that has the given min/max/distinct_count properties. - /// - /// Given min/max will be mapped to a [`ScalarValue`] if - /// they are not `None`. - fn create_column_stats( - min: Option, - max: Option, - distinct_count: Option, - ) -> Vec { - vec![ColumnStatistics { - distinct_count: distinct_count - .map(Precision::Inexact) - .unwrap_or(Precision::Absent), - min_value: min - .map(|size| Precision::Inexact(ScalarValue::UInt64(Some(size)))) - .unwrap_or(Precision::Absent), - max_value: max - .map(|size| Precision::Inexact(ScalarValue::UInt64(Some(size)))) - .unwrap_or(Precision::Absent), - ..Default::default() - }] - } - - /// Create join filter for NLJoinExec with expression `big_col > small_col` - /// where both columns are 0-indexed and come from left and right inputs respectively - fn nl_join_filter() -> Option { - let column_indices = vec![ - ColumnIndex { - index: 0, - side: JoinSide::Left, - }, - ColumnIndex { - index: 0, - side: JoinSide::Right, - }, - ]; - let intermediate_schema = Schema::new(vec![ - Field::new("big_col", DataType::Int32, false), - Field::new("small_col", DataType::Int32, false), - ]); - let expression = Arc::new(BinaryExpr::new( - Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), - Operator::Gt, - Arc::new(Column::new_with_schema("small_col", &intermediate_schema).unwrap()), - )) as _; - Some(JoinFilter::new( - expression, - column_indices, - Arc::new(intermediate_schema), - )) - } - - /// Returns three plans with statistics of (min, max, distinct_count) - /// * big 100K rows @ (0, 50k, 50k) - /// * medium 10K rows @ (1k, 5k, 1k) - /// * small 1K rows @ (0, 100k, 1k) - fn create_nested_with_min_max() -> ( - Arc, - Arc, - Arc, - ) { - let big = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(100_000), - column_statistics: create_column_stats( - Some(0), - Some(50_000), - Some(50_000), - ), - total_byte_size: Precision::Absent, - }, - Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), - )); - - let medium = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(10_000), - column_statistics: create_column_stats( - Some(1000), - Some(5000), - Some(1000), - ), - total_byte_size: Precision::Absent, - }, - Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]), - )); - - let small = Arc::new(StatisticsExec::new( - Statistics { - num_rows: Precision::Inexact(1000), - column_statistics: create_column_stats( - Some(0), - Some(100_000), - Some(1000), - ), - total_byte_size: Precision::Absent, - }, - Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), - )); - - (big, medium, small) - } - - #[tokio::test] - async fn test_join_with_swap() { - let (big, small) = create_big_and_small(); - - let join = Arc::new( - HashJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - vec![( - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), - Arc::new( - Column::new_with_schema("small_col", &small.schema()).unwrap(), - ), - )], - None, - &JoinType::Left, - None, - PartitionMode::CollectLeft, - false, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) - .unwrap(); - - let swapping_projection = optimized_join - .as_any() - .downcast_ref::() - .expect("A proj is required to swap columns back to their original order"); - - assert_eq!(swapping_projection.expr().len(), 2); - let (col, name) = &swapping_projection.expr()[0]; - assert_eq!(name, "big_col"); - assert_col_expr(col, "big_col", 1); - let (col, name) = &swapping_projection.expr()[1]; - assert_eq!(name, "small_col"); - assert_col_expr(col, "small_col", 0); - - let swapped_join = swapping_projection - .input() - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - } - - #[tokio::test] - async fn test_left_join_no_swap() { - let (big, small) = create_big_and_small(); - - let join = Arc::new( - HashJoinExec::try_new( - Arc::clone(&small), - Arc::clone(&big), - vec![( - Arc::new( - Column::new_with_schema("small_col", &small.schema()).unwrap(), - ), - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), - )], - None, - &JoinType::Left, - None, - PartitionMode::CollectLeft, - false, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) - .unwrap(); - - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - } - - #[tokio::test] - async fn test_join_with_swap_semi() { - let join_types = [JoinType::LeftSemi, JoinType::LeftAnti]; - for join_type in join_types { - let (big, small) = create_big_and_small(); - - let join = HashJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - vec![( - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), - Arc::new( - Column::new_with_schema("small_col", &small.schema()).unwrap(), - ), - )], - None, - &join_type, - None, - PartitionMode::Partitioned, - false, - ) - .unwrap(); - - let original_schema = join.schema(); - - let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &ConfigOptions::new()) - .unwrap(); - - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect( - "A proj is not required to swap columns back to their original order", - ); - - assert_eq!(swapped_join.schema().fields().len(), 1); - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - assert_eq!(original_schema, swapped_join.schema()); - } - } - - /// Compare the input plan with the plan after running the probe order optimizer. - macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr) => { - let expected_lines = - $EXPECTED_LINES.iter().map(|s| *s).collect::>(); - - let plan = Arc::new($PLAN); - let optimized = JoinSelection::new() - .optimize(plan.clone(), &ConfigOptions::new()) - .unwrap(); - - let plan_string = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = plan_string.split("\n").collect::>(); - - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - expected_lines, actual_lines - ); - }; - } - - #[tokio::test] - async fn test_nested_join_swap() { - let (big, medium, small) = create_nested_with_min_max(); - - // Form the inner join: big JOIN small - let child_join = HashJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - vec![( - col("big_col", &big.schema()).unwrap(), - col("small_col", &small.schema()).unwrap(), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - false, - ) - .unwrap(); - let child_schema = child_join.schema(); - - // Form join tree `medium LEFT JOIN (big JOIN small)` - let join = HashJoinExec::try_new( - Arc::clone(&medium), - Arc::new(child_join), - vec![( - col("medium_col", &medium.schema()).unwrap(), - col("small_col", &child_schema).unwrap(), - )], - None, - &JoinType::Left, - None, - PartitionMode::CollectLeft, - false, - ) - .unwrap(); - - // Hash join uses the left side to build the hash table, and right side to probe it. We want - // to keep left as small as possible, so if we can estimate (with a reasonable margin of error) - // that the left side is smaller than the right side, we should swap the sides. - // - // The first hash join's left is 'small' table (with 1000 rows), and the second hash join's - // left is the F(small IJ big) which has an estimated cardinality of 2000 rows (vs medium which - // has an exact cardinality of 10_000 rows). - let expected = [ - "ProjectionExec: expr=[medium_col@2 as medium_col, big_col@0 as big_col, small_col@1 as small_col]", - " HashJoinExec: mode=CollectLeft, join_type=Right, on=[(small_col@1, medium_col@0)]", - " ProjectionExec: expr=[big_col@1 as big_col, small_col@0 as small_col]", - " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(small_col@0, big_col@0)]", - " StatisticsExec: col_count=1, row_count=Inexact(1000)", - " StatisticsExec: col_count=1, row_count=Inexact(100000)", - " StatisticsExec: col_count=1, row_count=Inexact(10000)", - "", - ]; - assert_optimized!(expected, join); - } - - #[tokio::test] - async fn test_join_no_swap() { - let (big, small) = create_big_and_small(); - let join = Arc::new( - HashJoinExec::try_new( - Arc::clone(&small), - Arc::clone(&big), - vec![( - Arc::new( - Column::new_with_schema("small_col", &small.schema()).unwrap(), - ), - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - false, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) - .unwrap(); - - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - } - - #[rstest( - join_type, - case::inner(JoinType::Inner), - case::left(JoinType::Left), - case::right(JoinType::Right), - case::full(JoinType::Full) - )] - #[tokio::test] - async fn test_nl_join_with_swap(join_type: JoinType) { - let (big, small) = create_big_and_small(); - - let join = Arc::new( - NestedLoopJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - nl_join_filter(), - &join_type, - None, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) - .unwrap(); - - let swapping_projection = optimized_join - .as_any() - .downcast_ref::() - .expect("A proj is required to swap columns back to their original order"); - - assert_eq!(swapping_projection.expr().len(), 2); - let (col, name) = &swapping_projection.expr()[0]; - assert_eq!(name, "big_col"); - assert_col_expr(col, "big_col", 1); - let (col, name) = &swapping_projection.expr()[1]; - assert_eq!(name, "small_col"); - assert_col_expr(col, "small_col", 0); - - let swapped_join = swapping_projection - .input() - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - // Assert join side of big_col swapped in filter expression - let swapped_filter = swapped_join.filter().unwrap(); - let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); - let swapped_big_col_side = swapped_filter - .column_indices() - .get(swapped_big_col_idx) - .unwrap() - .side; - assert_eq!( - swapped_big_col_side, - JoinSide::Right, - "Filter column side should be swapped" - ); - - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - } - - #[rstest( - join_type, - case::left_semi(JoinType::LeftSemi), - case::left_anti(JoinType::LeftAnti), - case::right_semi(JoinType::RightSemi), - case::right_anti(JoinType::RightAnti) - )] - #[tokio::test] - async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { - let (big, small) = create_big_and_small(); - - let join = Arc::new( - NestedLoopJoinExec::try_new( - Arc::clone(&big), - Arc::clone(&small), - nl_join_filter(), - &join_type, - None, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize( - Arc::::clone(&join), - &ConfigOptions::new(), - ) - .unwrap(); - - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - // Assert before/after schemas are equal - assert_eq!( - join.schema(), - swapped_join.schema(), - "Join schema should not be modified while optimization" - ); - - // Assert join side of big_col swapped in filter expression - let swapped_filter = swapped_join.filter().unwrap(); - let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); - let swapped_big_col_side = swapped_filter - .column_indices() - .get(swapped_big_col_idx) - .unwrap() - .side; - assert_eq!( - swapped_big_col_side, - JoinSide::Right, - "Filter column side should be swapped" - ); - - assert_eq!( - swapped_join.left().statistics().unwrap().total_byte_size, - Precision::Inexact(8192) - ); - assert_eq!( - swapped_join.right().statistics().unwrap().total_byte_size, - Precision::Inexact(2097152) - ); - } - - #[rstest( - join_type, projection, small_on_right, - case::inner(JoinType::Inner, vec![1], true), - case::left(JoinType::Left, vec![1], true), - case::right(JoinType::Right, vec![1], true), - case::full(JoinType::Full, vec![1], true), - case::left_anti(JoinType::LeftAnti, vec![0], false), - case::left_semi(JoinType::LeftSemi, vec![0], false), - case::right_anti(JoinType::RightAnti, vec![0], true), - case::right_semi(JoinType::RightSemi, vec![0], true), - )] - #[tokio::test] - async fn test_hash_join_swap_on_joins_with_projections( - join_type: JoinType, - projection: Vec, - small_on_right: bool, - ) -> Result<()> { - let (big, small) = create_big_and_small(); - - let left = if small_on_right { &big } else { &small }; - let right = if small_on_right { &small } else { &big }; - - let left_on = if small_on_right { - "big_col" - } else { - "small_col" - }; - let right_on = if small_on_right { - "small_col" - } else { - "big_col" - }; - - let join = Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - vec![( - Arc::new(Column::new_with_schema(left_on, &left.schema())?), - Arc::new(Column::new_with_schema(right_on, &right.schema())?), - )], - None, - &join_type, - Some(projection), - PartitionMode::Partitioned, - false, - )?); - - let swapped = join - .swap_inputs(PartitionMode::Partitioned) - .expect("swap_hash_join must support joins with projections"); - let swapped_join = swapped.as_any().downcast_ref::().expect( - "ProjectionExec won't be added above if HashJoinExec contains embedded projection", - ); - - assert_eq!(swapped_join.projection, Some(vec![0_usize])); - assert_eq!(swapped.schema().fields.len(), 1); - assert_eq!(swapped.schema().fields[0].name(), "small_col"); - Ok(()) - } - - fn assert_col_expr(expr: &Arc, name: &str, index: usize) { - let col = expr - .as_any() - .downcast_ref::() - .expect("Projection items should be Column expression"); - assert_eq!(col.name(), name); - assert_eq!(col.index(), index); - } - - #[tokio::test] - async fn test_join_selection_collect_left() { - let big = Arc::new(StatisticsExec::new( - big_statistics(), - Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), - )); - - let small = Arc::new(StatisticsExec::new( - small_statistics(), - Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), - )); - - let empty = Arc::new(StatisticsExec::new( - empty_statistics(), - Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), - )); - - let join_on = vec![( - col("small_col", &small.schema()).unwrap(), - col("big_col", &big.schema()).unwrap(), - )]; - check_join_partition_mode( - Arc::::clone(&small), - Arc::::clone(&big), - join_on, - false, - PartitionMode::CollectLeft, - ); - - let join_on = vec![( - col("big_col", &big.schema()).unwrap(), - col("small_col", &small.schema()).unwrap(), - )]; - check_join_partition_mode( - big, - Arc::::clone(&small), - join_on, - true, - PartitionMode::CollectLeft, - ); - - let join_on = vec![( - col("small_col", &small.schema()).unwrap(), - col("empty_col", &empty.schema()).unwrap(), - )]; - check_join_partition_mode( - Arc::::clone(&small), - Arc::::clone(&empty), - join_on, - false, - PartitionMode::CollectLeft, - ); - - let join_on = vec![( - col("empty_col", &empty.schema()).unwrap(), - col("small_col", &small.schema()).unwrap(), - )]; - check_join_partition_mode( - empty, - small, - join_on, - true, - PartitionMode::CollectLeft, - ); - } - - #[tokio::test] - async fn test_join_selection_partitioned() { - let bigger = Arc::new(StatisticsExec::new( - bigger_statistics(), - Schema::new(vec![Field::new("bigger_col", DataType::Int32, false)]), - )); - - let big = Arc::new(StatisticsExec::new( - big_statistics(), - Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), - )); - - let empty = Arc::new(StatisticsExec::new( - empty_statistics(), - Schema::new(vec![Field::new("empty_col", DataType::Int32, false)]), - )); - - let join_on = vec![( - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("bigger_col", &bigger.schema()).unwrap()) - as _, - )]; - check_join_partition_mode( - Arc::::clone(&big), - Arc::::clone(&bigger), - join_on, - false, - PartitionMode::Partitioned, - ); - - let join_on = vec![( - Arc::new(Column::new_with_schema("bigger_col", &bigger.schema()).unwrap()) - as _, - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, - )]; - check_join_partition_mode( - bigger, - Arc::::clone(&big), - join_on, - true, - PartitionMode::Partitioned, - ); - - let join_on = vec![( - Arc::new(Column::new_with_schema("empty_col", &empty.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, - )]; - check_join_partition_mode( - Arc::::clone(&empty), - Arc::::clone(&big), - join_on, - false, - PartitionMode::Partitioned, - ); - - let join_on = vec![( - Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()) as _, - Arc::new(Column::new_with_schema("empty_col", &empty.schema()).unwrap()) as _, - )]; - check_join_partition_mode(big, empty, join_on, false, PartitionMode::Partitioned); - } - - fn check_join_partition_mode( - left: Arc, - right: Arc, - on: Vec<(PhysicalExprRef, PhysicalExprRef)>, - is_swapped: bool, - expected_mode: PartitionMode, - ) { - let join = Arc::new( - HashJoinExec::try_new( - left, - right, - on, - None, - &JoinType::Inner, - None, - PartitionMode::Auto, - false, - ) - .unwrap(), - ); - - let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) - .unwrap(); - - if !is_swapped { - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - assert_eq!(*swapped_join.partition_mode(), expected_mode); - } else { - let swapping_projection = optimized_join - .as_any() - .downcast_ref::() - .expect( - "A proj is required to swap columns back to their original order", - ); - let swapped_join = swapping_projection - .input() - .as_any() - .downcast_ref::() - .expect("The type of the plan should not be changed"); - - assert_eq!(*swapped_join.partition_mode(), expected_mode); - } - } -} - -#[cfg(test)] -mod util_tests { - use std::{ - any::Any, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - }; - - use arrow::{ - array::RecordBatch, - datatypes::{DataType, Field, Schema, SchemaRef}, - }; - use datafusion_common::{Result, Statistics}; - use datafusion_execution::{ - RecordBatchStream, SendableRecordBatchStream, TaskContext, - }; - use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; - use datafusion_physical_expr::intervals::utils::check_support; - use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; - use datafusion_physical_plan::{ - execution_plan::{Boundedness, EmissionType}, - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, - }; - use futures::Stream; - - #[derive(Debug)] - struct UnboundedStream { - batch_produce: Option, - count: usize, - batch: RecordBatch, - } - - impl Stream for UnboundedStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - if let Some(val) = self.batch_produce { - if val <= self.count { - return Poll::Ready(None); - } - } - self.count += 1; - Poll::Ready(Some(Ok(self.batch.clone()))) - } - } - - impl RecordBatchStream for UnboundedStream { - fn schema(&self) -> SchemaRef { - self.batch.schema() - } - } - - /// A mock execution plan that simply returns the provided data source characteristic - #[derive(Debug, Clone)] - pub struct UnboundedExec { - batch_produce: Option, - batch: RecordBatch, - cache: PlanProperties, - } - - impl UnboundedExec { - /// Create new exec that clones the given record batch to its output. - /// - /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. - pub fn new( - batch_produce: Option, - batch: RecordBatch, - partitions: usize, - ) -> Self { - let cache = - Self::compute_properties(batch.schema(), batch_produce, partitions); - Self { - batch_produce, - batch, - cache, - } - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - schema: SchemaRef, - batch_produce: Option, - n_partitions: usize, - ) -> PlanProperties { - let boundedness = if batch_produce.is_none() { - Boundedness::Unbounded { - requires_infinite_memory: false, - } - } else { - Boundedness::Bounded - }; - PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(n_partitions), - EmissionType::Incremental, - boundedness, - ) - } - } - - impl DisplayAs for UnboundedExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "UnboundedExec: unbounded={}", - self.batch_produce.is_none(), - ) - } - } - } - } - - impl ExecutionPlan for UnboundedExec { - fn name(&self) -> &'static str { - Self::static_name() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - Ok(Box::pin(UnboundedStream { - batch_produce: self.batch_produce, - count: 0, - batch: self.batch.clone(), - })) - } - } - - #[derive(Eq, PartialEq, Debug)] - pub enum SourceType { - Unbounded, - Bounded, - } - - /// A mock execution plan that simply returns the provided statistics - #[derive(Debug, Clone)] - pub struct StatisticsExec { - stats: Statistics, - schema: Arc, - cache: PlanProperties, - } - - impl StatisticsExec { - pub fn new(stats: Statistics, schema: Schema) -> Self { - assert_eq!( - stats.column_statistics.len(), schema.fields().len(), - "if defined, the column statistics vector length should be the number of fields" - ); - let cache = Self::compute_properties(Arc::new(schema.clone())); - Self { - stats, - schema: Arc::new(schema), - cache, - } - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(schema: SchemaRef) -> PlanProperties { - PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(2), - EmissionType::Incremental, - Boundedness::Bounded, - ) - } - } - - impl DisplayAs for StatisticsExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "StatisticsExec: col_count={}, row_count={:?}", - self.schema.fields().len(), - self.stats.num_rows, - ) - } - } - } - } - - impl ExecutionPlan for StatisticsExec { - fn name(&self) -> &'static str { - Self::static_name() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("This plan only serves for testing statistics") - } - - fn statistics(&self) -> Result { - Ok(self.stats.clone()) - } - } - - #[test] - fn check_expr_supported() { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ])); - let supported_expr = Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Plus, - Arc::new(Column::new("a", 0)), - )) as Arc; - assert!(check_support(&supported_expr, &schema)); - let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc; - assert!(check_support(&supported_expr_2, &schema)); - let unsupported_expr = Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Or, - Arc::new(Column::new("a", 0)), - )) as Arc; - assert!(!check_support(&unsupported_expr, &schema)); - let unsupported_expr_2 = Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Or, - Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))), - )) as Arc; - assert!(!check_support(&unsupported_expr_2, &schema)); - } -} - -#[cfg(test)] -mod hash_join_tests { - use super::*; - use util_tests::{SourceType, UnboundedExec}; - - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_plan::projection::ProjectionExec; - - struct TestCase { - case: String, - initial_sources_unbounded: (SourceType, SourceType), - initial_join_type: JoinType, - initial_mode: PartitionMode, - expected_sources_unbounded: (SourceType, SourceType), - expected_join_type: JoinType, - expected_mode: PartitionMode, - expecting_swap: bool, - } - - #[tokio::test] - async fn test_join_with_swap_full() -> Result<()> { - // NOTE: Currently, some initial conditions are not viable after join order selection. - // For example, full join always comes in partitioned mode. See the warning in - // function "swap". If this changes in the future, we should update these tests. - let cases = vec![ - TestCase { - case: "Bounded - Unbounded 1".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: JoinType::Full, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: JoinType::Full, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }, - TestCase { - case: "Unbounded - Bounded 2".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: JoinType::Full, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - expected_join_type: JoinType::Full, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }, - TestCase { - case: "Bounded - Bounded 3".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: JoinType::Full, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: JoinType::Full, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }, - TestCase { - case: "Unbounded - Unbounded 4".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: JoinType::Full, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: JoinType::Full, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }, - ]; - for case in cases.into_iter() { - test_join_with_maybe_swap_unbounded_case(case).await? - } - Ok(()) - } - - #[tokio::test] - async fn test_cases_without_collect_left_check() -> Result<()> { - let mut cases = vec![]; - let join_types = vec![JoinType::LeftSemi, JoinType::Inner]; - for join_type in join_types { - cases.push(TestCase { - case: "Unbounded - Bounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type.swap(), - expected_mode: PartitionMode::CollectLeft, - expecting_swap: true, - }); - cases.push(TestCase { - case: "Bounded - Unbounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Unbounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Bounded - Bounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Bounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type.swap(), - expected_mode: PartitionMode::Partitioned, - expecting_swap: true, - }); - cases.push(TestCase { - case: "Bounded - Unbounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Bounded - Bounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Unbounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - } - - for case in cases.into_iter() { - test_join_with_maybe_swap_unbounded_case(case).await? - } - Ok(()) - } - - #[tokio::test] - async fn test_not_support_collect_left() -> Result<()> { - let mut cases = vec![]; - // After [JoinSelection] optimization, these join types cannot run in CollectLeft mode except - // [JoinType::LeftSemi] - let the_ones_not_support_collect_left = vec![JoinType::Left, JoinType::LeftAnti]; - for join_type in the_ones_not_support_collect_left { - cases.push(TestCase { - case: "Unbounded - Bounded".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type.swap(), - expected_mode: PartitionMode::Partitioned, - expecting_swap: true, - }); - cases.push(TestCase { - case: "Bounded - Unbounded".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Bounded - Bounded".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Unbounded".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - } - - for case in cases.into_iter() { - test_join_with_maybe_swap_unbounded_case(case).await? - } - Ok(()) - } - - #[tokio::test] - async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> { - let mut cases = vec![]; - let the_ones_not_support_collect_left = - vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi]; - for join_type in the_ones_not_support_collect_left { - // We expect that (SourceType::Unbounded, SourceType::Bounded) will change, regardless of the - // statistics. - cases.push(TestCase { - case: "Unbounded - Bounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - // We expect that (SourceType::Bounded, SourceType::Unbounded) will stay same, regardless of the - // statistics. - cases.push(TestCase { - case: "Bounded - Unbounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Unbounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - // - cases.push(TestCase { - case: "Bounded - Bounded / CollectLeft".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::CollectLeft, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::CollectLeft, - expecting_swap: false, - }); - // If cases are partitioned, only unbounded & bounded check will affect the order. - cases.push(TestCase { - case: "Unbounded - Bounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Bounded - Unbounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Bounded - Bounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - cases.push(TestCase { - case: "Unbounded - Unbounded / Partitioned".to_string(), - initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded), - initial_join_type: join_type, - initial_mode: PartitionMode::Partitioned, - expected_sources_unbounded: ( - SourceType::Unbounded, - SourceType::Unbounded, - ), - expected_join_type: join_type, - expected_mode: PartitionMode::Partitioned, - expecting_swap: false, - }); - } - - for case in cases.into_iter() { - test_join_with_maybe_swap_unbounded_case(case).await? - } - Ok(()) - } - - async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { - let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded; - let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded; - let left_exec = Arc::new(UnboundedExec::new( - (!left_unbounded).then_some(1), - RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( - "a", - DataType::Int32, - false, - )]))), - 2, - )) as _; - let right_exec = Arc::new(UnboundedExec::new( - (!right_unbounded).then_some(1), - RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( - "b", - DataType::Int32, - false, - )]))), - 2, - )) as _; - - let join = Arc::new(HashJoinExec::try_new( - Arc::clone(&left_exec), - Arc::clone(&right_exec), - vec![( - col("a", &left_exec.schema())?, - col("b", &right_exec.schema())?, - )], - None, - &t.initial_join_type, - None, - t.initial_mode, - false, - )?) as _; - - let optimized_join_plan = hash_join_swap_subrule(join, &ConfigOptions::new())?; - - // If swap did happen - let projection_added = optimized_join_plan.as_any().is::(); - let plan = if projection_added { - let proj = optimized_join_plan - .as_any() - .downcast_ref::() - .expect( - "A proj is required to swap columns back to their original order", - ); - Arc::::clone(proj.input()) - } else { - optimized_join_plan - }; - - if let Some(HashJoinExec { - left, - right, - join_type, - mode, - .. - }) = plan.as_any().downcast_ref::() - { - let left_changed = Arc::ptr_eq(left, &right_exec); - let right_changed = Arc::ptr_eq(right, &left_exec); - // If this is not equal, we have a bigger problem. - assert_eq!(left_changed, right_changed); - assert_eq!( - ( - t.case.as_str(), - if left.boundedness().is_unbounded() { - SourceType::Unbounded - } else { - SourceType::Bounded - }, - if right.boundedness().is_unbounded() { - SourceType::Unbounded - } else { - SourceType::Bounded - }, - join_type, - mode, - left_changed && right_changed - ), - ( - t.case.as_str(), - t.expected_sources_unbounded.0, - t.expected_sources_unbounded.1, - &t.expected_join_type, - &t.expected_mode, - t.expecting_swap - ) - ); - }; - Ok(()) - } -} +// See tests in datafusion/core/tests/physical_optimizer diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index bcb87944f5fd..e52e46a16375 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -32,6 +32,6 @@ pub mod pruning; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; -pub use optimizer::PhysicalOptimizerRule; -pub mod test_utils; pub mod utils; + +pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7a44b2e90dde..1c7e4d3d4c3d 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -22,6 +22,7 @@ use std::fmt::Debug; use std::sync::Arc; use crate::PhysicalOptimizerRule; + use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; @@ -339,480 +340,3 @@ fn add_global_limit( } // See tests in datafusion/core/tests/physical_optimizer - -#[cfg(test)] -mod test { - use super::*; - use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::config::ConfigOptions; - use datafusion_execution::{SendableRecordBatchStream, TaskContext}; - use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::BinaryExpr; - use datafusion_physical_expr::expressions::{col, lit}; - use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; - use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; - use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; - use datafusion_physical_plan::empty::EmptyExec; - use datafusion_physical_plan::filter::FilterExec; - use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; - use datafusion_physical_plan::projection::ProjectionExec; - use datafusion_physical_plan::repartition::RepartitionExec; - use datafusion_physical_plan::sorts::sort::SortExec; - use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; - use datafusion_physical_plan::{ - get_plan_string, ExecutionPlan, ExecutionPlanProperties, - }; - use std::sync::Arc; - - #[derive(Debug)] - struct DummyStreamPartition { - schema: SchemaRef, - } - impl PartitionStream for DummyStreamPartition { - fn schema(&self) -> &SchemaRef { - &self.schema - } - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - unreachable!() - } - } - - #[test] - fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero( - ) -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(schema)?; - let global_limit = global_limit_exec(streaming_table, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_limit_when_skip_is_nonzero( - ) -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(schema)?; - let global_limit = global_limit_exec(streaming_table, 2, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=2, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "GlobalLimitExec: skip=2, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=7" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit( - ) -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(Arc::clone(&schema))?; - let repartition = repartition_exec(streaming_table)?; - let filter = filter_exec(schema, repartition)?; - let coalesce_batches = coalesce_batches_exec(filter); - let local_limit = local_limit_exec(coalesce_batches, 5); - let coalesce_partitions = coalesce_partitions_exec(local_limit); - let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " LocalLimitExec: fetch=5", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192, fetch=5", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(Arc::clone(&schema))?; - let filter = filter_exec(Arc::clone(&schema), streaming_table)?; - let projection = projection_exec(schema, filter)?; - let global_limit = global_limit_exec(projection, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " FilterExec: c3@2 > 0", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " GlobalLimitExec: skip=0, fetch=5", - " FilterExec: c3@2 > 0", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version( - ) -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap(); - let coalesce_batches = coalesce_batches_exec(streaming_table); - let projection = projection_exec(schema, coalesce_batches)?; - let global_limit = global_limit_exec(projection, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(Arc::clone(&schema)).unwrap(); - let coalesce_batches = coalesce_batches_exec(streaming_table); - let projection = projection_exec(Arc::clone(&schema), coalesce_batches)?; - let repartition = repartition_exec(projection)?; - let sort = sort_exec( - vec![PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }], - repartition, - ); - let spm = - sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort); - let global_limit = global_limit_exec(spm, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " SortPreservingMergeExec: [c1@0 ASC]", - " SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "SortPreservingMergeExec: [c1@0 ASC], fetch=5", - " SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions( - ) -> Result<()> { - let schema = create_schema(); - let streaming_table = streaming_table_exec(Arc::clone(&schema))?; - let repartition = repartition_exec(streaming_table)?; - let filter = filter_exec(schema, repartition)?; - let coalesce_partitions = coalesce_partitions_exec(filter); - let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn merges_local_limit_with_local_limit() -> Result<()> { - let schema = create_schema(); - let empty_exec = empty_exec(schema); - let child_local_limit = local_limit_exec(empty_exec, 10); - let parent_local_limit = local_limit_exec(child_local_limit, 20); - - let initial = get_plan_string(&parent_local_limit); - let expected_initial = [ - "LocalLimitExec: fetch=20", - " LocalLimitExec: fetch=10", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?; - - let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn merges_global_limit_with_global_limit() -> Result<()> { - let schema = create_schema(); - let empty_exec = empty_exec(schema); - let child_global_limit = global_limit_exec(empty_exec, 10, Some(30)); - let parent_global_limit = global_limit_exec(child_global_limit, 10, Some(20)); - - let initial = get_plan_string(&parent_global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=10, fetch=20", - " GlobalLimitExec: skip=10, fetch=30", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(parent_global_limit, &ConfigOptions::new())?; - - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn merges_global_limit_with_local_limit() -> Result<()> { - let schema = create_schema(); - let empty_exec = empty_exec(schema); - let local_limit = local_limit_exec(empty_exec, 40); - let global_limit = global_limit_exec(local_limit, 20, Some(30)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=20, fetch=30", - " LocalLimitExec: fetch=40", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn merges_local_limit_with_global_limit() -> Result<()> { - let schema = create_schema(); - let empty_exec = empty_exec(schema); - let global_limit = global_limit_exec(empty_exec, 20, Some(30)); - let local_limit = local_limit_exec(global_limit, 20); - - let initial = get_plan_string(&local_limit); - let expected_initial = [ - "LocalLimitExec: fetch=20", - " GlobalLimitExec: skip=20, fetch=30", - " EmptyExec", - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?; - - let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - fn create_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Int32, true), - Field::new("c3", DataType::Int32, true), - ])) - } - - fn streaming_table_exec(schema: SchemaRef) -> Result> { - Ok(Arc::new(StreamingTableExec::try_new( - Arc::clone(&schema), - vec![Arc::new(DummyStreamPartition { schema }) as _], - None, - None, - true, - None, - )?)) - } - - fn global_limit_exec( - input: Arc, - skip: usize, - fetch: Option, - ) -> Arc { - Arc::new(GlobalLimitExec::new(input, skip, fetch)) - } - - fn local_limit_exec( - input: Arc, - fetch: usize, - ) -> Arc { - Arc::new(LocalLimitExec::new(input, fetch)) - } - - fn sort_exec( - sort_exprs: impl IntoIterator, - input: Arc, - ) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::new(sort_exprs, input)) - } - - fn sort_preserving_merge_exec( - sort_exprs: impl IntoIterator, - input: Arc, - ) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) - } - - fn projection_exec( - schema: SchemaRef, - input: Arc, - ) -> Result> { - Ok(Arc::new(ProjectionExec::try_new( - vec![ - (col("c1", schema.as_ref()).unwrap(), "c1".to_string()), - (col("c2", schema.as_ref()).unwrap(), "c2".to_string()), - (col("c3", schema.as_ref()).unwrap(), "c3".to_string()), - ], - input, - )?)) - } - - fn filter_exec( - schema: SchemaRef, - input: Arc, - ) -> Result> { - Ok(Arc::new(FilterExec::try_new( - Arc::new(BinaryExpr::new( - col("c3", schema.as_ref()).unwrap(), - Operator::Gt, - lit(0), - )), - input, - )?)) - } - - fn coalesce_batches_exec(input: Arc) -> Arc { - Arc::new(CoalesceBatchesExec::new(input, 8192)) - } - - fn coalesce_partitions_exec( - local_limit: Arc, - ) -> Arc { - Arc::new(CoalescePartitionsExec::new(local_limit)) - } - - fn repartition_exec( - streaming_table: Arc, - ) -> Result> { - Ok(Arc::new(RepartitionExec::try_new( - streaming_table, - Partitioning::RoundRobinBatch(8), - )?)) - } - - fn empty_exec(schema: SchemaRef) -> Arc { - Arc::new(EmptyExec::new(schema)) - } -}