From a668f580b0d1759232ae195d2469c9c82fdf347a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 22 Jan 2025 17:34:26 -0500 Subject: [PATCH 1/2] consolidate physical_optimizer tests into core/tests/physical_optimizer Move tests --- .../aggregate_statistics.rs | 306 +++++++ .../physical_optimizer/enforce_sorting.rs | 832 ++++++++++++++++- .../limited_distinct_aggregation.rs | 108 ++- .../physical_optimizer/sanity_checker.rs | 285 +++++- .../src/aggregate_statistics.rs | 312 +------ .../src/enforce_sorting/mod.rs | 2 + .../tests/enforce_sorting.rs | 861 ------------------ .../tests/limited_distinct_aggregation.rs | 131 --- datafusion/physical-optimizer/tests/mod.rs | 20 - .../tests/sanity_checker.rs | 308 ------- 10 files changed, 1527 insertions(+), 1638 deletions(-) create mode 100644 datafusion/core/tests/physical_optimizer/aggregate_statistics.rs delete mode 100644 datafusion/physical-optimizer/tests/enforce_sorting.rs delete mode 100644 datafusion/physical-optimizer/tests/limited_distinct_aggregation.rs delete mode 100644 datafusion/physical-optimizer/tests/mod.rs delete mode 100644 datafusion/physical-optimizer/tests/sanity_checker.rs diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs new file mode 100644 index 000000000000..b82fc046a9a9 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -0,0 +1,306 @@ + +use datafusion_common::config::ConfigOptions; +use datafusion_execution::TaskContext; +use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::aggregates::AggregateExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::ExecutionPlan; +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_expr_common::operator::Operator; + +use datafusion_physical_plan::aggregates::PhysicalGroupBy; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::common; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::memory::MemoryExec; + +use arrow::array::Int32Array; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_common::cast::as_int64_array; +use datafusion_physical_expr::expressions::{self, cast}; +use datafusion_physical_optimizer::test_utils::TestAggregate; +use datafusion_physical_plan::aggregates::AggregateMode; + +/// Mock data using a MemoryExec which has an exact count statistic +fn mock_data() -> Result> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), None])), + Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])), + ], + )?; + + Ok(Arc::new(MemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?)) +} + +/// Checks that the count optimization was applied and we still get the right result +async fn assert_count_optim_success( + plan: AggregateExec, + agg: TestAggregate, +) -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let plan: Arc = Arc::new(plan); + + let config = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?; + + // A ProjectionExec is a sign that the count optimization was applied + assert!(optimized.as_any().is::()); + + // run both the optimized and nonoptimized plan + let optimized_result = + common::collect(optimized.execute(0, Arc::clone(&task_ctx))?).await?; + let nonoptimized_result = common::collect(plan.execute(0, task_ctx)?).await?; + assert_eq!(optimized_result.len(), nonoptimized_result.len()); + + // and validate the results are the same and expected + assert_eq!(optimized_result.len(), 1); + check_batch(optimized_result.into_iter().next().unwrap(), &agg); + // check the non optimized one too to ensure types and names remain the same + assert_eq!(nonoptimized_result.len(), 1); + check_batch(nonoptimized_result.into_iter().next().unwrap(), &agg); + + Ok(()) +} + +fn check_batch(batch: RecordBatch, agg: &TestAggregate) { + let schema = batch.schema(); + let fields = schema.fields(); + assert_eq!(fields.len(), 1); + + let field = &fields[0]; + assert_eq!(field.name(), agg.column_name()); + assert_eq!(field.data_type(), &DataType::Int64); + // note that nullability differs + + assert_eq!( + as_int64_array(batch.column(0)).unwrap().values(), + &[agg.expected_count()] + ); +} + +#[tokio::test] +async fn test_count_partial_direct_child() -> Result<()> { + // basic test case with the aggregation applied on a source with exact statistics + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + assert_count_optim_success(final_agg, agg).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_count_partial_with_nulls_direct_child() -> Result<()> { + // basic test case with the aggregation applied on a source with exact statistics + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_column(&schema); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + source, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + assert_count_optim_success(final_agg, agg).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_count_partial_indirect_child() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + source, + Arc::clone(&schema), + )?; + + // We introduce an intermediate optimization step between the partial and final aggregator + let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(coalesce), + Arc::clone(&schema), + )?; + + assert_count_optim_success(final_agg, agg).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_column(&schema); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + source, + Arc::clone(&schema), + )?; + + // We introduce an intermediate optimization step between the partial and final aggregator + let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(coalesce), + Arc::clone(&schema), + )?; + + assert_count_optim_success(final_agg, agg).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_count_inexact_stat() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + // adding a filter makes the statistics inexact + let filter = Arc::new(FilterExec::try_new( + expressions::binary( + expressions::col("a", &schema)?, + Operator::Gt, + cast(expressions::lit(1u32), &schema, DataType::Int32)?, + &schema, + )?, + source, + )?); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + filter, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + // check that the original ExecutionPlan was not replaced + assert!(optimized.as_any().is::()); + + Ok(()) +} + +#[tokio::test] +async fn test_count_with_nulls_inexact_stat() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_column(&schema); + + // adding a filter makes the statistics inexact + let filter = Arc::new(FilterExec::try_new( + expressions::binary( + expressions::col("a", &schema)?, + Operator::Gt, + cast(expressions::lit(1u32), &schema, DataType::Int32)?, + &schema, + )?, + source, + )?); + + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + filter, + Arc::clone(&schema), + )?; + + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![Arc::new(agg.count_expr(&schema))], + vec![None], + Arc::new(partial_agg), + Arc::clone(&schema), + )?; + + let conf = ConfigOptions::new(); + let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; + + // check that the original ExecutionPlan was not replaced + assert!(optimized.as_any().is::()); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 4fe04753fd9c..d83458f11674 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -37,13 +37,16 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TreeNode, TransformedResult}; -use datafusion_physical_optimizer::test_utils::{check_integrity,bounded_window_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec}; +use datafusion_physical_optimizer::test_utils::{check_integrity, bounded_window_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec, coalesce_batches_exec, aggregate_exec, RequirementsTestExec}; use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use rstest::rstest; +use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::sorts::sort::SortExec; /// Create a csv exec for tests fn csv_exec_ordered( @@ -1236,3 +1239,830 @@ async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> { assert_optimized!(expected_input, expected_no_change, physical_plan, false); Ok(()) } + + + +/// Runs the sort enforcement optimizer and asserts the plan +/// against the original and expected plans +/// +/// `$EXPECTED_PLAN_LINES`: input plan +/// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan +/// `$PLAN`: the plan to optimized +/// `REPARTITION_SORTS`: Flag to set `config.options.optimizer.repartition_sorts` option. +macro_rules! assert_optimized { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $REPARTITION_SORTS: expr) => { + let mut config = ConfigOptions::new(); + config.optimizer.repartition_sorts = $REPARTITION_SORTS; + + // This file has 4 rules that use tree node, apply these rules as in the + // EnforceSorting::optimize implementation + // After these operations tree nodes should be in a consistent state. + // This code block makes sure that these rules doesn't violate tree node integrity. + { + let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone()); + let adjusted = plan_requirements + .transform_up(ensure_sorting) + .data() + .and_then(check_integrity)?; + // TODO: End state payloads will be checked here. + + let new_plan = if config.optimizer.repartition_sorts { + let plan_with_coalesce_partitions = + PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); + let parallel = plan_with_coalesce_partitions + .transform_up(parallelize_sorts) + .data() + .and_then(check_integrity)?; + // TODO: End state payloads will be checked here. + parallel.plan + } else { + adjusted.plan + }; + + let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); + let updated_plan = plan_with_pipeline_fixer + .transform_up(|plan_with_pipeline_fixer| { + replace_with_order_preserving_variants( + plan_with_pipeline_fixer, + false, + true, + &config, + ) + }) + .data() + .and_then(check_integrity)?; + // TODO: End state payloads will be checked here. + + let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); + assign_initial_requirements(&mut sort_pushdown); + check_integrity(pushdown_sorts(sort_pushdown)?)?; + // TODO: End state payloads will be checked here. + } + + let physical_plan = $PLAN; + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES + .iter().map(|s| *s).collect(); + + assert_eq!( + expected_plan_lines, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES + .iter().map(|s| *s).collect(); + + // Run the actual optimizer + let optimized_physical_plan = + EnforceSorting::new().optimize(physical_plan, &config)?; + + // Get string representation of the plan + let actual = get_plan_string(&optimized_physical_plan); + assert_eq!( + expected_optimized_lines, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + }; +} + +#[tokio::test] +async fn test_remove_unnecessary_sort() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); + let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], input); + + let expected_input = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &source.schema(), + SortOptions { + descending: true, + nulls_first: true, + }, + )]; + let sort = sort_exec(sort_exprs.clone(), source); + // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before + let coalesce_batches = coalesce_batches_exec(sort); + + let window_agg = + bounded_window_exec("non_nullable_col", sort_exprs, coalesce_batches); + + let sort_exprs = vec![sort_expr_options( + "non_nullable_col", + &window_agg.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + + let sort = sort_exec(sort_exprs.clone(), window_agg); + + // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before + let filter = filter_exec( + Arc::new(NotExpr::new( + col("non_nullable_col", schema.as_ref()).unwrap(), + )), + sort, + ); + + let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs, filter); + + let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " FilterExec: NOT non_nullable_col@1", + " SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " CoalesceBatchesExec: target_batch_size=128", + " SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + + let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]", + " FilterExec: NOT non_nullable_col@1", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " CoalesceBatchesExec: target_batch_size=128", + " SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_add_required_sort() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + + let physical_plan = sort_preserving_merge_exec(sort_exprs, source); + + let expected_input = [ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort1() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), spm); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = [ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort2() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort2 = sort_exec(sort_exprs.clone(), spm); + let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); + + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + let sort3 = sort_exec(sort_exprs, spm2); + let physical_plan = repartition_exec(repartition_exec(sort3)); + + let expected_input = [ + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort3() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let sort_exprs = LexOrdering::new(vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]); + let repartition_exec = repartition_exec(spm); + let sort2 = Arc::new( + SortExec::new(sort_exprs.clone(), repartition_exec) + .with_preserve_partitioning(true), + ) as _; + let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); + + let physical_plan = aggregate_exec(spm2); + + // When removing a `SortPreservingMergeExec`, make sure that partitioning + // requirements are not violated. In some cases, we may need to replace + // it with a `CoalescePartitionsExec` instead of directly removing it. + let expected_input = [ + "AggregateExec: mode=Final, gby=[], aggr=[]", + " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "AggregateExec: mode=Final, gby=[], aggr=[]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort4() -> Result<()> { + let schema = create_test_schema()?; + let source1 = repartition_exec(memory_exec(&schema)); + + let source2 = repartition_exec(memory_exec(&schema)); + let union = union_exec(vec![source1, source2]); + + let sort_exprs = LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]); + // let sort = sort_exec(sort_exprs.clone(), union); + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true), + ) as _; + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let filter = filter_exec( + Arc::new(NotExpr::new( + col("non_nullable_col", schema.as_ref()).unwrap(), + )), + spm, + ); + + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let physical_plan = sort_exec(sort_exprs, filter); + + // When removing a `SortPreservingMergeExec`, make sure that partitioning + // requirements are not violated. In some cases, we may need to replace + // it with a `CoalescePartitionsExec` instead of directly removing it. + let expected_input = ["SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", + " FilterExec: NOT non_nullable_col@1", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[true]", + " UnionExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + + let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]", + " FilterExec: NOT non_nullable_col@1", + " UnionExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort6() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = Arc::new( + SortExec::new( + LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), + source, + ) + .with_fetch(Some(2)), + ); + let physical_plan = sort_exec( + vec![ + sort_expr("non_nullable_col", &schema), + sort_expr("nullable_col", &schema), + ], + input, + ); + + let expected_input = [ + "SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort7() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = Arc::new(SortExec::new( + LexOrdering::new(vec![ + sort_expr("non_nullable_col", &schema), + sort_expr("nullable_col", &schema), + ]), + source, + )); + + let physical_plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), + input, + ) + .with_fetch(Some(2)), + ) as Arc; + + let expected_input = [ + "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "GlobalLimitExec: skip=0, fetch=2", + " SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_sort8() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = Arc::new(SortExec::new( + LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), + source, + )); + let limit = Arc::new(LocalLimitExec::new(input, 2)); + let physical_plan = sort_exec( + vec![ + sort_expr("non_nullable_col", &schema), + sort_expr("nullable_col", &schema), + ], + limit, + ); + + let expected_input = [ + "SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " LocalLimitExec: fetch=2", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "LocalLimitExec: fetch=2", + " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_do_not_pushdown_through_limit() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + // let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); + let input = Arc::new(SortExec::new( + LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), + source, + )); + let limit = Arc::new(GlobalLimitExec::new(input, 0, Some(5))) as _; + let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], limit); + + let expected_input = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " GlobalLimitExec: skip=0, fetch=5", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " GlobalLimitExec: skip=0, fetch=5", + " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_unnecessary_spm1() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = + sort_preserving_merge_exec(vec![sort_expr("non_nullable_col", &schema)], source); + let input2 = + sort_preserving_merge_exec(vec![sort_expr("non_nullable_col", &schema)], input); + let physical_plan = + sort_preserving_merge_exec(vec![sort_expr("nullable_col", &schema)], input2); + + let expected_input = [ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_change_wrong_sorting() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort = sort_exec(vec![sort_exprs[0].clone()], source); + let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); + let expected_input = [ + "SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_change_wrong_sorting2() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let spm1 = sort_preserving_merge_exec(sort_exprs.clone(), source); + let sort2 = sort_exec(vec![sort_exprs[0].clone()], spm1); + let physical_plan = sort_preserving_merge_exec(vec![sort_exprs[1].clone()], sort2); + + let expected_input = [ + "SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_multiple_sort_window_exec() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + + let sort_exprs1 = vec![sort_expr("nullable_col", &schema)]; + let sort_exprs2 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + + let sort1 = sort_exec(sort_exprs1.clone(), source); + let window_agg1 = bounded_window_exec("non_nullable_col", sort_exprs1.clone(), sort1); + let window_agg2 = bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1); + // let filter_exec = sort_exec; + let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2); + + let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + + let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]"]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +// With new change in SortEnforcement EnforceSorting->EnforceDistribution->EnforceSorting +// should produce same result with EnforceDistribution+EnforceSorting +// This enables us to use EnforceSorting possibly before EnforceDistribution +// Given that it will be called at least once after last EnforceDistribution. The reason is that +// EnforceDistribution may invalidate ordering invariant. +async fn test_commutativity() -> Result<()> { + let schema = create_test_schema()?; + let config = ConfigOptions::new(); + + let memory_exec = memory_exec(&schema); + let sort_exprs = LexOrdering::new(vec![sort_expr("nullable_col", &schema)]); + let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); + let repartition = repartition_exec(window); + + let orig_plan = + Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; + let actual = get_plan_string(&orig_plan); + let expected_input = vec![ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_eq!( + expected_input, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_input:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let mut plan = orig_plan.clone(); + let rules = vec![ + Arc::new(EnforceDistribution::new()) as Arc, + Arc::new(EnforceSorting::new()) as Arc, + ]; + for rule in rules { + plan = rule.optimize(plan, &config)?; + } + let first_plan = plan.clone(); + + let mut plan = orig_plan.clone(); + let rules = vec![ + Arc::new(EnforceSorting::new()) as Arc, + Arc::new(EnforceDistribution::new()) as Arc, + Arc::new(EnforceSorting::new()) as Arc, + ]; + for rule in rules { + plan = rule.optimize(plan, &config)?; + } + let second_plan = plan.clone(); + + assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan)); + Ok(()) +} + +#[tokio::test] +async fn test_coalesce_propagate() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let repartition = repartition_exec(source); + let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition)); + let repartition = repartition_exec(coalesce_partitions); + let sort_exprs = LexOrdering::new(vec![sort_expr("nullable_col", &schema)]); + // Add local sort + let sort = Arc::new( + SortExec::new(sort_exprs.clone(), repartition).with_preserve_partitioning(true), + ) as _; + let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); + let sort = sort_exec(sort_exprs, spm); + + let physical_plan = sort.clone(); + // Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort + // Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec. + let expected_input = [ + "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + let expected_optimized = [ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[tokio::test] +async fn test_replace_with_partial_sort2() -> Result<()> { + let schema = create_test_schema3()?; + let input_sort_exprs = vec![sort_expr("a", &schema), sort_expr("c", &schema)]; + let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); + + let physical_plan = sort_exec( + vec![ + sort_expr("a", &schema), + sort_expr("c", &schema), + sort_expr("d", &schema), + ], + unbounded_input, + ); + + let expected_input = [ + "SortExec: expr=[a@0 ASC, c@2 ASC, d@3 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]" + ]; + // let optimized + let expected_optimized = [ + "PartialSortExec: expr=[a@0 ASC, c@2 ASC, d@3 ASC], common_prefix_length=[2]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + Ok(()) +} + +#[tokio::test] +async fn test_push_with_required_input_ordering_prohibited() -> Result<()> { + // SortExec: expr=[b] <-- can't push this down + // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order + // SortExec: expr=[a] + // MemoryExec + let schema = create_test_schema3()?; + let sort_exprs_a = LexOrdering::new(vec![sort_expr("a", &schema)]); + let sort_exprs_b = LexOrdering::new(vec![sort_expr("b", &schema)]); + let plan = memory_exec(&schema); + let plan = sort_exec(sort_exprs_a.clone(), plan); + let plan = RequirementsTestExec::new(plan) + .with_required_input_ordering(sort_exprs_a) + .with_maintains_input_order(true) + .into_arc(); + let plan = sort_exec(sort_exprs_b, plan); + + let expected_input = [ + "SortExec: expr=[b@1 ASC], preserve_partitioning=[false]", + " RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + // should not be able to push shorts + let expected_no_change = expected_input; + assert_optimized!(expected_input, expected_no_change, plan, true); + Ok(()) +} + +// test when the required input ordering is satisfied so could push through +#[tokio::test] +async fn test_push_with_required_input_ordering_allowed() -> Result<()> { + // SortExec: expr=[a,b] <-- can push this down (as it is compatible with the required input ordering) + // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order + // SortExec: expr=[a] + // MemoryExec + let schema = create_test_schema3()?; + let sort_exprs_a = LexOrdering::new(vec![sort_expr("a", &schema)]); + let sort_exprs_ab = + LexOrdering::new(vec![sort_expr("a", &schema), sort_expr("b", &schema)]); + let plan = memory_exec(&schema); + let plan = sort_exec(sort_exprs_a.clone(), plan); + let plan = RequirementsTestExec::new(plan) + .with_required_input_ordering(sort_exprs_a) + .with_maintains_input_order(true) + .into_arc(); + let plan = sort_exec(sort_exprs_ab, plan); + + let expected_input = [ + "SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]", + " RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + // should able to push shorts + let expected = [ + "RequiredInputOrderingExec", + " SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected, plan, true); + Ok(()) +} + +#[tokio::test] +async fn test_replace_with_partial_sort() -> Result<()> { + let schema = create_test_schema3()?; + let input_sort_exprs = vec![sort_expr("a", &schema)]; + let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); + + let physical_plan = sort_exec( + vec![sort_expr("a", &schema), sort_expr("c", &schema)], + unbounded_input, + ); + + let expected_input = [ + "SortExec: expr=[a@0 ASC, c@2 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]" + ]; + let expected_optimized = [ + "PartialSortExec: expr=[a@0 ASC, c@2 ASC], common_prefix_length=[1]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + Ok(()) +} + +#[tokio::test] +async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> { + let schema = create_test_schema3()?; + let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)]; + let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); + + let physical_plan = sort_exec( + vec![ + sort_expr("a", &schema), + sort_expr("b", &schema), + sort_expr("c", &schema), + ], + unbounded_input, + ); + let expected_input = [ + "SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]" + ]; + let expected_no_change = expected_input; + assert_optimized!(expected_input, expected_no_change, physical_plan, true); + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 4373dc40de38..21526fcb7d55 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -22,14 +22,15 @@ use std::sync::Arc; use crate::physical_optimizer::parquet_exec_with_sort; use arrow::{compute::SortOptions, util::pretty::pretty_format_batches}; +use arrow_schema::DataType; use datafusion::prelude::SessionContext; use datafusion_common::Result; use datafusion_execution::config::SessionConfig; -use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; +use datafusion_expr::Operator; +use datafusion_physical_expr::{expressions, expressions::col, PhysicalSortExpr}; +use datafusion_physical_expr::expressions::cast; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::test_utils::{ - assert_plan_matches_expected, build_group_by, mock_data, schema, -}; +use datafusion_physical_optimizer::test_utils::{assert_plan_matches_expected, build_group_by, mock_data, schema, TestAggregate}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode}, collect, @@ -264,3 +265,102 @@ fn test_has_order_by() -> Result<()> { assert_plan_matches_expected(&plan, &expected)?; Ok(()) } + + + +#[test] +fn test_no_group_by() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema, vec![]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema, /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[], aggr=[]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} + +#[test] +fn test_has_aggregate_expression() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema, vec!["a".to_string()]), + vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ + vec![None], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} + +#[test] +fn test_has_filter() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec + // the `a > 1` filter is applied in the AggregateExec + let filter_expr = Some(expressions::binary( + col("a", &schema)?, + Operator::Gt, + cast(expressions::lit(1u32), &schema, DataType::Int32)?, + &schema, + )?); + let agg = TestAggregate::new_count_star(); + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ + vec![filter_expr], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 7636df9b3db9..60693da8dd1e 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -16,12 +16,20 @@ // under the License. use std::sync::Arc; - +use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::prelude::{CsvReadOptions, SessionContext}; -use datafusion_common::Result; +use datafusion_common::{JoinType, Result}; use async_trait::async_trait; +use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; +use datafusion_physical_optimizer::test_utils::{bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec}; +use datafusion_physical_plan::{displayable, ExecutionPlan}; +use datafusion_physical_plan::repartition::RepartitionExec; async fn register_current_csv( ctx: &SessionContext, @@ -361,3 +369,276 @@ async fn test_analyzer() -> Result<()> { case.run().await?; Ok(()) } + + +fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) +} + +fn create_test_schema2() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])) +} + +/// Check if sanity checker should accept or reject plans. +fn assert_sanity_check(plan: &Arc, is_sane: bool) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + assert_eq!( + sanity_checker.optimize(plan.clone(), &opts).is_ok(), + is_sane + ); +} + +/// Check if the plan we created is as expected by comparing the plan +/// formatted as a string. +fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { + let plan_str = displayable(plan).indent(true).to_string(); + let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); + assert_eq!(actual_lines, expected_lines); +} + +#[tokio::test] +/// Tests that plan is valid when the sort requirements are satisfied. +async fn test_bounded_window_agg_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let bw = bounded_window_exec("c9", sort_exprs, sort); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + assert_sanity_check(&bw, true); + Ok(()) +} + +#[tokio::test] +/// Tests that plan is invalid when the sort requirements are not satisfied. +async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let bw = bounded_window_exec("c9", sort_exprs, source); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&bw, false); + Ok(()) +} + +#[tokio::test] +/// A valid when a single partition requirement +/// is satisfied. +async fn test_global_limit_single_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// An invalid plan when a single partition requirement +/// is not satisfied. +async fn test_global_limit_multi_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(repartition_exec(source)); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&limit, false); + Ok(()) +} + +#[tokio::test] +/// A plan with no requirements should satisfy. +async fn test_local_limit() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = local_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "LocalLimitExec: fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// Valid plan with multiple children satisfy both order and distribution. +async fn test_sort_merge_join_satisfied() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&smj, true); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the order is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_order_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let right = memory_exec(&schema2); + let sort_exprs1 = vec![sort_expr_options( + "c9", + &source1.schema(), + SortOptions::default(), + )]; + let left = sort_exec(sort_exprs1, source1); + // Missing sort of the right child here.. + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the distribution is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_dist_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(10), + )?); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + // Missing hash partitioning on right child. + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 68bfd63b9a19..44991c751059 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -147,314 +147,4 @@ fn take_optimizable_value_from_statistics( value.map(|val| (val, agg_expr.name().to_string())) } -#[cfg(test)] -mod tests { - use crate::aggregate_statistics::AggregateStatistics; - use crate::PhysicalOptimizerRule; - use datafusion_common::config::ConfigOptions; - use datafusion_execution::TaskContext; - use datafusion_physical_plan::aggregates::AggregateExec; - use datafusion_physical_plan::projection::ProjectionExec; - use datafusion_physical_plan::ExecutionPlan; - use std::sync::Arc; - - use datafusion_common::Result; - use datafusion_expr_common::operator::Operator; - - use datafusion_physical_plan::aggregates::PhysicalGroupBy; - use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; - use datafusion_physical_plan::common; - use datafusion_physical_plan::filter::FilterExec; - use datafusion_physical_plan::memory::MemoryExec; - - use crate::test_utils::TestAggregate; - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use datafusion_common::cast::as_int64_array; - use datafusion_physical_expr::expressions::{self, cast}; - use datafusion_physical_plan::aggregates::AggregateMode; - - /// Mock data using a MemoryExec which has an exact count statistic - fn mock_data() -> Result> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])); - - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2), None])), - Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])), - ], - )?; - - Ok(Arc::new(MemoryExec::try_new( - &[vec![batch]], - Arc::clone(&schema), - None, - )?)) - } - - /// Checks that the count optimization was applied and we still get the right result - async fn assert_count_optim_success( - plan: AggregateExec, - agg: TestAggregate, - ) -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); - let plan: Arc = Arc::new(plan); - - let config = ConfigOptions::new(); - let optimized = - AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?; - - // A ProjectionExec is a sign that the count optimization was applied - assert!(optimized.as_any().is::()); - - // run both the optimized and nonoptimized plan - let optimized_result = - common::collect(optimized.execute(0, Arc::clone(&task_ctx))?).await?; - let nonoptimized_result = common::collect(plan.execute(0, task_ctx)?).await?; - assert_eq!(optimized_result.len(), nonoptimized_result.len()); - - // and validate the results are the same and expected - assert_eq!(optimized_result.len(), 1); - check_batch(optimized_result.into_iter().next().unwrap(), &agg); - // check the non optimized one too to ensure types and names remain the same - assert_eq!(nonoptimized_result.len(), 1); - check_batch(nonoptimized_result.into_iter().next().unwrap(), &agg); - - Ok(()) - } - - fn check_batch(batch: RecordBatch, agg: &TestAggregate) { - let schema = batch.schema(); - let fields = schema.fields(); - assert_eq!(fields.len(), 1); - - let field = &fields[0]; - assert_eq!(field.name(), agg.column_name()); - assert_eq!(field.data_type(), &DataType::Int64); - // note that nullability differs - - assert_eq!( - as_int64_array(batch.column(0)).unwrap().values(), - &[agg.expected_count()] - ); - } - - #[tokio::test] - async fn test_count_partial_direct_child() -> Result<()> { - // basic test case with the aggregation applied on a source with exact statistics - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_with_nulls_direct_child() -> Result<()> { - // basic test case with the aggregation applied on a source with exact statistics - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_inexact_stat() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - // adding a filter makes the statistics inexact - let filter = Arc::new(FilterExec::try_new( - expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?, - source, - )?); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - filter, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = - AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - - // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); - - Ok(()) - } - - #[tokio::test] - async fn test_count_with_nulls_inexact_stat() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - // adding a filter makes the statistics inexact - let filter = Arc::new(FilterExec::try_new( - expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?, - source, - )?); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - filter, - Arc::clone(&schema), - )?; - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(partial_agg), - Arc::clone(&schema), - )?; - - let conf = ConfigOptions::new(); - let optimized = - AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; - - // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); - - Ok(()) - } -} +// Tests are in tests/cases/aggregate_statistics.rs diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index f098f16d9b84..2d23894d6b5e 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -656,3 +656,5 @@ fn get_sort_exprs( plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec") } } + +// Tests are in tests/cases/enforce_sorting.rs diff --git a/datafusion/physical-optimizer/tests/enforce_sorting.rs b/datafusion/physical-optimizer/tests/enforce_sorting.rs deleted file mode 100644 index df1a484e1585..000000000000 --- a/datafusion/physical-optimizer/tests/enforce_sorting.rs +++ /dev/null @@ -1,861 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::compute::SortOptions; -use datafusion_common::Result; -use datafusion_physical_expr::expressions::{col, NotExpr}; -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::displayable; -use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting,PlanWithCorrespondingCoalescePartitions,PlanWithCorrespondingSort,parallelize_sorts,ensure_sorting}; -use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants,OrderPreservationContext}; -use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts}; -use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{TreeNode, TransformedResult}; -use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; -use datafusion_physical_optimizer::test_utils::{check_integrity,aggregate_exec, bounded_window_exec, coalesce_batches_exec, create_test_schema, create_test_schema3, filter_exec, memory_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_preserving_merge_exec, stream_exec_ordered, union_exec, RequirementsTestExec}; - -/// Runs the sort enforcement optimizer and asserts the plan -/// against the original and expected plans -/// -/// `$EXPECTED_PLAN_LINES`: input plan -/// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan -/// `$PLAN`: the plan to optimized -/// `REPARTITION_SORTS`: Flag to set `config.options.optimizer.repartition_sorts` option. -macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $REPARTITION_SORTS: expr) => { - let mut config = ConfigOptions::new(); - config.optimizer.repartition_sorts = $REPARTITION_SORTS; - - // This file has 4 rules that use tree node, apply these rules as in the - // EnforceSorting::optimize implementation - // After these operations tree nodes should be in a consistent state. - // This code block makes sure that these rules doesn't violate tree node integrity. - { - let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone()); - let adjusted = plan_requirements - .transform_up(ensure_sorting) - .data() - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - - let new_plan = if config.optimizer.repartition_sorts { - let plan_with_coalesce_partitions = - PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); - let parallel = plan_with_coalesce_partitions - .transform_up(parallelize_sorts) - .data() - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - parallel.plan - } else { - adjusted.plan - }; - - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan); - let updated_plan = plan_with_pipeline_fixer - .transform_up(|plan_with_pipeline_fixer| { - replace_with_order_preserving_variants( - plan_with_pipeline_fixer, - false, - true, - &config, - ) - }) - .data() - .and_then(check_integrity)?; - // TODO: End state payloads will be checked here. - - let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); - assign_initial_requirements(&mut sort_pushdown); - check_integrity(pushdown_sorts(sort_pushdown)?)?; - // TODO: End state payloads will be checked here. - } - - let physical_plan = $PLAN; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - - let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES - .iter().map(|s| *s).collect(); - - assert_eq!( - expected_plan_lines, actual, - "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES - .iter().map(|s| *s).collect(); - - // Run the actual optimizer - let optimized_physical_plan = - EnforceSorting::new().optimize(physical_plan, &config)?; - - // Get string representation of the plan - let actual = get_plan_string(&optimized_physical_plan); - assert_eq!( - expected_optimized_lines, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - }; -} - -#[tokio::test] -async fn test_remove_unnecessary_sort() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); - let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], input); - - let expected_input = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - - let sort_exprs = vec![sort_expr_options( - "non_nullable_col", - &source.schema(), - SortOptions { - descending: true, - nulls_first: true, - }, - )]; - let sort = sort_exec(sort_exprs.clone(), source); - // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before - let coalesce_batches = coalesce_batches_exec(sort); - - let window_agg = - bounded_window_exec("non_nullable_col", sort_exprs, coalesce_batches); - - let sort_exprs = vec![sort_expr_options( - "non_nullable_col", - &window_agg.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - - let sort = sort_exec(sort_exprs.clone(), window_agg); - - // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before - let filter = filter_exec( - Arc::new(NotExpr::new( - col("non_nullable_col", schema.as_ref()).unwrap(), - )), - sort, - ); - - let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs, filter); - - let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " FilterExec: NOT non_nullable_col@1", - " SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " CoalesceBatchesExec: target_batch_size=128", - " SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - - let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL), is_causal: false }]", - " FilterExec: NOT non_nullable_col@1", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " CoalesceBatchesExec: target_batch_size=128", - " SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_add_required_sort() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - - let sort_exprs = vec![sort_expr("nullable_col", &schema)]; - - let physical_plan = sort_preserving_merge_exec(sort_exprs, source); - - let expected_input = [ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort1() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr("nullable_col", &schema)]; - let sort = sort_exec(sort_exprs.clone(), source); - let spm = sort_preserving_merge_exec(sort_exprs, sort); - - let sort_exprs = vec![sort_expr("nullable_col", &schema)]; - let sort = sort_exec(sort_exprs.clone(), spm); - let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); - let expected_input = [ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort2() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; - let sort = sort_exec(sort_exprs.clone(), source); - let spm = sort_preserving_merge_exec(sort_exprs, sort); - - let sort_exprs = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - let sort2 = sort_exec(sort_exprs.clone(), spm); - let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); - - let sort_exprs = vec![sort_expr("nullable_col", &schema)]; - let sort3 = sort_exec(sort_exprs, spm2); - let physical_plan = repartition_exec(repartition_exec(sort3)); - - let expected_input = [ - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", - " SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - - let expected_optimized = [ - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort3() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; - let sort = sort_exec(sort_exprs.clone(), source); - let spm = sort_preserving_merge_exec(sort_exprs, sort); - - let sort_exprs = LexOrdering::new(vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]); - let repartition_exec = repartition_exec(spm); - let sort2 = Arc::new( - SortExec::new(sort_exprs.clone(), repartition_exec) - .with_preserve_partitioning(true), - ) as _; - let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); - - let physical_plan = aggregate_exec(spm2); - - // When removing a `SortPreservingMergeExec`, make sure that partitioning - // requirements are not violated. In some cases, we may need to replace - // it with a `CoalescePartitionsExec` instead of directly removing it. - let expected_input = [ - "AggregateExec: mode=Final, gby=[], aggr=[]", - " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - - let expected_optimized = [ - "AggregateExec: mode=Final, gby=[], aggr=[]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort4() -> Result<()> { - let schema = create_test_schema()?; - let source1 = repartition_exec(memory_exec(&schema)); - - let source2 = repartition_exec(memory_exec(&schema)); - let union = union_exec(vec![source1, source2]); - - let sort_exprs = LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]); - // let sort = sort_exec(sort_exprs.clone(), union); - let sort = Arc::new( - SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true), - ) as _; - let spm = sort_preserving_merge_exec(sort_exprs, sort); - - let filter = filter_exec( - Arc::new(NotExpr::new( - col("non_nullable_col", schema.as_ref()).unwrap(), - )), - spm, - ); - - let sort_exprs = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - let physical_plan = sort_exec(sort_exprs, filter); - - // When removing a `SortPreservingMergeExec`, make sure that partitioning - // requirements are not violated. In some cases, we may need to replace - // it with a `CoalescePartitionsExec` instead of directly removing it. - let expected_input = ["SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", - " FilterExec: NOT non_nullable_col@1", - " SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[true]", - " UnionExec", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - - let expected_optimized = ["SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[true]", - " FilterExec: NOT non_nullable_col@1", - " UnionExec", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort6() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let input = Arc::new( - SortExec::new( - LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), - source, - ) - .with_fetch(Some(2)), - ); - let physical_plan = sort_exec( - vec![ - sort_expr("non_nullable_col", &schema), - sort_expr("nullable_col", &schema), - ], - input, - ); - - let expected_input = [ - "SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort7() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let input = Arc::new(SortExec::new( - LexOrdering::new(vec![ - sort_expr("non_nullable_col", &schema), - sort_expr("nullable_col", &schema), - ]), - source, - )); - - let physical_plan = Arc::new( - SortExec::new( - LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), - input, - ) - .with_fetch(Some(2)), - ) as Arc; - - let expected_input = [ - "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "GlobalLimitExec: skip=0, fetch=2", - " SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_sort8() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let input = Arc::new(SortExec::new( - LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), - source, - )); - let limit = Arc::new(LocalLimitExec::new(input, 2)); - let physical_plan = sort_exec( - vec![ - sort_expr("non_nullable_col", &schema), - sort_expr("nullable_col", &schema), - ], - limit, - ); - - let expected_input = [ - "SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " LocalLimitExec: fetch=2", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "LocalLimitExec: fetch=2", - " SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_do_not_pushdown_through_limit() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - // let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source); - let input = Arc::new(SortExec::new( - LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), - source, - )); - let limit = Arc::new(GlobalLimitExec::new(input, 0, Some(5))) as _; - let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], limit); - - let expected_input = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " GlobalLimitExec: skip=0, fetch=5", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " GlobalLimitExec: skip=0, fetch=5", - " SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_remove_unnecessary_spm1() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let input = - sort_preserving_merge_exec(vec![sort_expr("non_nullable_col", &schema)], source); - let input2 = - sort_preserving_merge_exec(vec![sort_expr("non_nullable_col", &schema)], input); - let physical_plan = - sort_preserving_merge_exec(vec![sort_expr("nullable_col", &schema)], input2); - - let expected_input = [ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_change_wrong_sorting() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let sort_exprs = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - let sort = sort_exec(vec![sort_exprs[0].clone()], source); - let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); - let expected_input = [ - "SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_change_wrong_sorting2() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let sort_exprs = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - let spm1 = sort_preserving_merge_exec(sort_exprs.clone(), source); - let sort2 = sort_exec(vec![sort_exprs[0].clone()], spm1); - let physical_plan = sort_preserving_merge_exec(vec![sort_exprs[1].clone()], sort2); - - let expected_input = [ - "SortPreservingMergeExec: [non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " SortPreservingMergeExec: [nullable_col@0 ASC, non_nullable_col@1 ASC]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_multiple_sort_window_exec() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - - let sort_exprs1 = vec![sort_expr("nullable_col", &schema)]; - let sort_exprs2 = vec![ - sort_expr("nullable_col", &schema), - sort_expr("non_nullable_col", &schema), - ]; - - let sort1 = sort_exec(sort_exprs1.clone(), source); - let window_agg1 = bounded_window_exec("non_nullable_col", sort_exprs1.clone(), sort1); - let window_agg2 = bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1); - // let filter_exec = sort_exec; - let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2); - - let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - - let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[nullable_col@0 ASC, non_nullable_col@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]"]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -// With new change in SortEnforcement EnforceSorting->EnforceDistribution->EnforceSorting -// should produce same result with EnforceDistribution+EnforceSorting -// This enables us to use EnforceSorting possibly before EnforceDistribution -// Given that it will be called at least once after last EnforceDistribution. The reason is that -// EnforceDistribution may invalidate ordering invariant. -async fn test_commutativity() -> Result<()> { - let schema = create_test_schema()?; - let config = ConfigOptions::new(); - - let memory_exec = memory_exec(&schema); - let sort_exprs = LexOrdering::new(vec![sort_expr("nullable_col", &schema)]); - let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); - let repartition = repartition_exec(window); - - let orig_plan = - Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; - let actual = get_plan_string(&orig_plan); - let expected_input = vec![ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_eq!( - expected_input, actual, - "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_input:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let mut plan = orig_plan.clone(); - let rules = vec![ - Arc::new(EnforceDistribution::new()) as Arc, - Arc::new(EnforceSorting::new()) as Arc, - ]; - for rule in rules { - plan = rule.optimize(plan, &config)?; - } - let first_plan = plan.clone(); - - let mut plan = orig_plan.clone(); - let rules = vec![ - Arc::new(EnforceSorting::new()) as Arc, - Arc::new(EnforceDistribution::new()) as Arc, - Arc::new(EnforceSorting::new()) as Arc, - ]; - for rule in rules { - plan = rule.optimize(plan, &config)?; - } - let second_plan = plan.clone(); - - assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan)); - Ok(()) -} - -#[tokio::test] -async fn test_coalesce_propagate() -> Result<()> { - let schema = create_test_schema()?; - let source = memory_exec(&schema); - let repartition = repartition_exec(source); - let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition)); - let repartition = repartition_exec(coalesce_partitions); - let sort_exprs = LexOrdering::new(vec![sort_expr("nullable_col", &schema)]); - // Add local sort - let sort = Arc::new( - SortExec::new(sort_exprs.clone(), repartition).with_preserve_partitioning(true), - ) as _; - let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); - let sort = sort_exec(sort_exprs, spm); - - let physical_plan = sort.clone(); - // Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort - // Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec. - let expected_input = [ - "SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - let expected_optimized = [ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - - Ok(()) -} - -#[tokio::test] -async fn test_replace_with_partial_sort2() -> Result<()> { - let schema = create_test_schema3()?; - let input_sort_exprs = vec![sort_expr("a", &schema), sort_expr("c", &schema)]; - let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); - - let physical_plan = sort_exec( - vec![ - sort_expr("a", &schema), - sort_expr("c", &schema), - sort_expr("d", &schema), - ], - unbounded_input, - ); - - let expected_input = [ - "SortExec: expr=[a@0 ASC, c@2 ASC, d@3 ASC], preserve_partitioning=[false]", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]" - ]; - // let optimized - let expected_optimized = [ - "PartialSortExec: expr=[a@0 ASC, c@2 ASC, d@3 ASC], common_prefix_length=[2]", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - Ok(()) -} - -#[tokio::test] -async fn test_push_with_required_input_ordering_prohibited() -> Result<()> { - // SortExec: expr=[b] <-- can't push this down - // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order - // SortExec: expr=[a] - // MemoryExec - let schema = create_test_schema3()?; - let sort_exprs_a = LexOrdering::new(vec![sort_expr("a", &schema)]); - let sort_exprs_b = LexOrdering::new(vec![sort_expr("b", &schema)]); - let plan = memory_exec(&schema); - let plan = sort_exec(sort_exprs_a.clone(), plan); - let plan = RequirementsTestExec::new(plan) - .with_required_input_ordering(sort_exprs_a) - .with_maintains_input_order(true) - .into_arc(); - let plan = sort_exec(sort_exprs_b, plan); - - let expected_input = [ - "SortExec: expr=[b@1 ASC], preserve_partitioning=[false]", - " RequiredInputOrderingExec", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - // should not be able to push shorts - let expected_no_change = expected_input; - assert_optimized!(expected_input, expected_no_change, plan, true); - Ok(()) -} - -// test when the required input ordering is satisfied so could push through -#[tokio::test] -async fn test_push_with_required_input_ordering_allowed() -> Result<()> { - // SortExec: expr=[a,b] <-- can push this down (as it is compatible with the required input ordering) - // RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order - // SortExec: expr=[a] - // MemoryExec - let schema = create_test_schema3()?; - let sort_exprs_a = LexOrdering::new(vec![sort_expr("a", &schema)]); - let sort_exprs_ab = - LexOrdering::new(vec![sort_expr("a", &schema), sort_expr("b", &schema)]); - let plan = memory_exec(&schema); - let plan = sort_exec(sort_exprs_a.clone(), plan); - let plan = RequirementsTestExec::new(plan) - .with_required_input_ordering(sort_exprs_a) - .with_maintains_input_order(true) - .into_arc(); - let plan = sort_exec(sort_exprs_ab, plan); - - let expected_input = [ - "SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]", - " RequiredInputOrderingExec", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - // should able to push shorts - let expected = [ - "RequiredInputOrderingExec", - " SortExec: expr=[a@0 ASC, b@1 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ]; - assert_optimized!(expected_input, expected, plan, true); - Ok(()) -} - -#[tokio::test] -async fn test_replace_with_partial_sort() -> Result<()> { - let schema = create_test_schema3()?; - let input_sort_exprs = vec![sort_expr("a", &schema)]; - let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); - - let physical_plan = sort_exec( - vec![sort_expr("a", &schema), sort_expr("c", &schema)], - unbounded_input, - ); - - let expected_input = [ - "SortExec: expr=[a@0 ASC, c@2 ASC], preserve_partitioning=[false]", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]" - ]; - let expected_optimized = [ - "PartialSortExec: expr=[a@0 ASC, c@2 ASC], common_prefix_length=[1]", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - Ok(()) -} - -#[tokio::test] -async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> { - let schema = create_test_schema3()?; - let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)]; - let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs); - - let physical_plan = sort_exec( - vec![ - sort_expr("a", &schema), - sort_expr("b", &schema), - sort_expr("c", &schema), - ], - unbounded_input, - ); - let expected_input = [ - "SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false]", - " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]" - ]; - let expected_no_change = expected_input; - assert_optimized!(expected_input, expected_no_change, physical_plan, true); - Ok(()) -} diff --git a/datafusion/physical-optimizer/tests/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/tests/limited_distinct_aggregation.rs deleted file mode 100644 index 6427d4def1a8..000000000000 --- a/datafusion/physical-optimizer/tests/limited_distinct_aggregation.rs +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Tests for [`LimitedDistinctAggregation`] physical optimizer rule - -use std::sync::Arc; - -use arrow_schema::DataType; -use datafusion_common::Result; -use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{cast, col}; -use datafusion_physical_optimizer::test_utils::{ - assert_plan_matches_expected, build_group_by, mock_data, TestAggregate, -}; -use datafusion_physical_plan::{ - aggregates::{AggregateExec, AggregateMode}, - expressions, - limit::LocalLimitExec, - ExecutionPlan, -}; - -#[test] -fn test_no_group_by() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema, vec![]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema, /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[], aggr=[]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) -} - -#[test] -fn test_has_aggregate_expression() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema, vec!["a".to_string()]), - vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ - vec![None], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) -} - -#[test] -fn test_has_filter() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec - // the `a > 1` filter is applied in the AggregateExec - let filter_expr = Some(expressions::binary( - col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?); - let agg = TestAggregate::new_count_star(); - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */ - vec![filter_expr], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) -} diff --git a/datafusion/physical-optimizer/tests/mod.rs b/datafusion/physical-optimizer/tests/mod.rs deleted file mode 100644 index 0e5eab515590..000000000000 --- a/datafusion/physical-optimizer/tests/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -mod enforce_sorting; -mod limited_distinct_aggregation; -mod sanity_checker; diff --git a/datafusion/physical-optimizer/tests/sanity_checker.rs b/datafusion/physical-optimizer/tests/sanity_checker.rs deleted file mode 100644 index e21385cd9fd0..000000000000 --- a/datafusion/physical-optimizer/tests/sanity_checker.rs +++ /dev/null @@ -1,308 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Tests for [`SanityCheckPlan`] physical optimizer rule - -use std::sync::Arc; - -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::config::ConfigOptions; -use datafusion_common::Result; -use datafusion_expr::JoinType; -use datafusion_physical_expr::expressions::col; -use datafusion_physical_expr::Partitioning; -use datafusion_physical_optimizer::test_utils::{ - bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, -}; -use datafusion_physical_optimizer::{sanity_checker::*, PhysicalOptimizerRule}; -use datafusion_physical_plan::displayable; -use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::ExecutionPlan; - -fn create_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) -} - -fn create_test_schema2() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])) -} - -/// Check if sanity checker should accept or reject plans. -fn assert_sanity_check(plan: &Arc, is_sane: bool) { - let sanity_checker = SanityCheckPlan::new(); - let opts = ConfigOptions::default(); - assert_eq!( - sanity_checker.optimize(plan.clone(), &opts).is_ok(), - is_sane - ); -} - -/// Check if the plan we created is as expected by comparing the plan -/// formatted as a string. -fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { - let plan_str = displayable(plan).indent(true).to_string(); - let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); - assert_eq!(actual_lines, expected_lines); -} - -#[tokio::test] -/// Tests that plan is valid when the sort requirements are satisfied. -async fn test_bounded_window_agg_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let sort = sort_exec(sort_exprs.clone(), source); - let bw = bounded_window_exec("c9", sort_exprs, sort); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - assert_sanity_check(&bw, true); - Ok(()) -} - -#[tokio::test] -/// Tests that plan is invalid when the sort requirements are not satisfied. -async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let bw = bounded_window_exec("c9", sort_exprs, source); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&bw, false); - Ok(()) -} - -#[tokio::test] -/// A valid when a single partition requirement -/// is satisfied. -async fn test_global_limit_single_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) -} - -#[tokio::test] -/// An invalid plan when a single partition requirement -/// is not satisfied. -async fn test_global_limit_multi_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(repartition_exec(source)); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&limit, false); - Ok(()) -} - -#[tokio::test] -/// A plan with no requirements should satisfy. -async fn test_local_limit() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = local_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "LocalLimitExec: fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) -} - -#[tokio::test] -/// Valid plan with multiple children satisfy both order and distribution. -async fn test_sort_merge_join_satisfied() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&smj, true); - Ok(()) -} - -#[tokio::test] -/// Invalid case when the order is not satisfied by the 2nd -/// child. -async fn test_sort_merge_join_order_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let right = memory_exec(&schema2); - let sort_exprs1 = vec![sort_expr_options( - "c9", - &source1.schema(), - SortOptions::default(), - )]; - let left = sort_exec(sort_exprs1, source1); - // Missing sort of the right child here.. - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) -} - -#[tokio::test] -/// Invalid case when the distribution is not satisfied by the 2nd -/// child. -async fn test_sort_merge_join_dist_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::RoundRobinBatch(10), - )?); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - // Missing hash partitioning on right child. - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) -} From 6c31b7e46ec7d9df8edf730829a2a196b6cc9dae Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 22 Jan 2025 17:56:41 -0500 Subject: [PATCH 2/2] cleanu --- .../physical_optimizer/aggregate_statistics.rs | 18 +++++++++++++++++- .../physical_optimizer/enforce_sorting.rs | 8 +++----- .../limited_distinct_aggregation.rs | 8 ++++---- .../core/tests/physical_optimizer/mod.rs | 1 + .../tests/physical_optimizer/sanity_checker.rs | 12 +++++++----- .../src/aggregate_statistics.rs | 2 +- .../src/enforce_distribution.rs | 2 ++ .../src/limited_distinct_aggregation.rs | 2 +- .../physical-optimizer/src/sanity_checker.rs | 2 ++ 9 files changed, 38 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index b82fc046a9a9..95da1767a3ef 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. use datafusion_common::config::ConfigOptions; use datafusion_execution::TaskContext; @@ -9,7 +25,6 @@ use datafusion_physical_plan::ExecutionPlan; use std::sync::Arc; use datafusion_common::Result; -use datafusion_expr_common::operator::Operator; use datafusion_physical_plan::aggregates::PhysicalGroupBy; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -21,6 +36,7 @@ use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_int64_array; +use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{self, cast}; use datafusion_physical_optimizer::test_utils::TestAggregate; use datafusion_physical_plan::aggregates::AggregateMode; diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index d83458f11674..4ca8381f2672 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -43,10 +43,10 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; -use rstest::rstest; use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort::SortExec; +use rstest::rstest; /// Create a csv exec for tests fn csv_exec_ordered( @@ -1240,8 +1240,6 @@ async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> { Ok(()) } - - /// Runs the sort enforcement optimizer and asserts the plan /// against the original and expected plans /// @@ -1606,7 +1604,7 @@ async fn test_remove_unnecessary_sort6() -> Result<()> { LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), source, ) - .with_fetch(Some(2)), + .with_fetch(Some(2)), ); let physical_plan = sort_exec( vec![ @@ -1647,7 +1645,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> { LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]), input, ) - .with_fetch(Some(2)), + .with_fetch(Some(2)), ) as Arc; let expected_input = [ diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 21526fcb7d55..7c04d1239bc8 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -27,10 +27,12 @@ use datafusion::prelude::SessionContext; use datafusion_common::Result; use datafusion_execution::config::SessionConfig; use datafusion_expr::Operator; -use datafusion_physical_expr::{expressions, expressions::col, PhysicalSortExpr}; use datafusion_physical_expr::expressions::cast; +use datafusion_physical_expr::{expressions, expressions::col, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_optimizer::test_utils::{assert_plan_matches_expected, build_group_by, mock_data, schema, TestAggregate}; +use datafusion_physical_optimizer::test_utils::{ + assert_plan_matches_expected, build_group_by, mock_data, schema, TestAggregate, +}; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode}, collect, @@ -266,8 +268,6 @@ fn test_has_order_by() -> Result<()> { Ok(()) } - - #[test] fn test_no_group_by() -> Result<()> { let source = mock_data()?; diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index e4128d8fb8ef..19da0ef7855e 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,6 +17,7 @@ //! Physical Optimizer integration tests +mod aggregate_statistics; mod combine_partial_final_agg; mod enforce_distribution; mod enforce_sorting; diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 60693da8dd1e..7f723ae67e8e 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -15,21 +15,24 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::prelude::{CsvReadOptions, SessionContext}; use datafusion_common::{JoinType, Result}; +use std::sync::Arc; use async_trait::async_trait; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::Partitioning; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; -use datafusion_physical_optimizer::test_utils::{bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec}; -use datafusion_physical_plan::{displayable, ExecutionPlan}; +use datafusion_physical_optimizer::test_utils::{ + bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, + repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, +}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::{displayable, ExecutionPlan}; async fn register_current_csv( ctx: &SessionContext, @@ -370,7 +373,6 @@ async fn test_analyzer() -> Result<()> { Ok(()) } - fn create_test_schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 44991c751059..a9b02188a7a2 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -147,4 +147,4 @@ fn take_optimizable_value_from_statistics( value.map(|val| (val, agg_expr.name().to_string())) } -// Tests are in tests/cases/aggregate_statistics.rs +// See tests in datafusion/core/tests/physical_optimizer diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 2f37c672bcda..176c1e69aba8 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1405,3 +1405,5 @@ fn update_children(mut dist_context: DistributionContext) -> Result