From e1b9c1469422530ea7d28fcfd04f8bd5202c106d Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 16 Aug 2024 22:42:26 +0800 Subject: [PATCH 1/5] refactor: Move LimitedDistinctAggregation to physical-optimizer crate --- .../limited_distinct_aggregation.rs | 611 ------------------ datafusion/core/src/physical_optimizer/mod.rs | 1 - .../limited_distinct_aggregation.rs | 443 +++++++++++++ .../core/tests/physical_optimizer/mod.rs | 2 + .../tests/physical_optimizer/test_util.rs | 57 ++ datafusion/physical-optimizer/Cargo.toml | 1 + datafusion/physical-optimizer/src/lib.rs | 1 + .../src/limited_distinct_aggregation.rs | 190 ++++++ 8 files changed, 694 insertions(+), 612 deletions(-) delete mode 100644 datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs create mode 100644 datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs create mode 100644 datafusion/core/tests/physical_optimizer/test_util.rs create mode 100644 datafusion/physical-optimizer/src/limited_distinct_aggregation.rs diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs deleted file mode 100644 index b181ad9051ed..000000000000 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ /dev/null @@ -1,611 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! A special-case optimizer rule that pushes limit into a grouped aggregation -//! which has no aggregate expressions or sorting requirements - -use std::sync::Arc; - -use crate::physical_plan::aggregates::AggregateExec; -use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; - -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::Result; - -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use itertools::Itertools; - -/// An optimizer rule that passes a `limit` hint into grouped aggregations which don't require all -/// rows in the group to be processed for correctness. Example queries fitting this description are: -/// `SELECT distinct l_orderkey FROM lineitem LIMIT 10;` -/// `SELECT l_orderkey FROM lineitem GROUP BY l_orderkey LIMIT 10;` -pub struct LimitedDistinctAggregation {} - -impl LimitedDistinctAggregation { - /// Create a new `LimitedDistinctAggregation` - pub fn new() -> Self { - Self {} - } - - fn transform_agg( - aggr: &AggregateExec, - limit: usize, - ) -> Option> { - // rules for transforming this Aggregate are held in this method - if !aggr.is_unordered_unfiltered_group_by_distinct() { - return None; - } - - // We found what we want: clone, copy the limit down, and return modified node - let new_aggr = AggregateExec::try_new( - *aggr.mode(), - aggr.group_expr().clone(), - aggr.aggr_expr().to_vec(), - aggr.filter_expr().to_vec(), - aggr.input().clone(), - aggr.input_schema(), - ) - .expect("Unable to copy Aggregate!") - .with_limit(Some(limit)); - Some(Arc::new(new_aggr)) - } - - /// transform_limit matches an `AggregateExec` as the child of a `LocalLimitExec` - /// or `GlobalLimitExec` and pushes the limit into the aggregation as a soft limit when - /// there is a group by, but no sorting, no aggregate expressions, and no filters in the - /// aggregation - fn transform_limit(plan: Arc) -> Option> { - let limit: usize; - let mut global_fetch: Option = None; - let mut global_skip: usize = 0; - let children: Vec>; - let mut is_global_limit = false; - if let Some(local_limit) = plan.as_any().downcast_ref::() { - limit = local_limit.fetch(); - children = local_limit.children().into_iter().cloned().collect(); - } else if let Some(global_limit) = plan.as_any().downcast_ref::() - { - global_fetch = global_limit.fetch(); - global_fetch?; - global_skip = global_limit.skip(); - // the aggregate must read at least fetch+skip number of rows - limit = global_fetch.unwrap() + global_skip; - children = global_limit.children().into_iter().cloned().collect(); - is_global_limit = true - } else { - return None; - } - let child = children.iter().exactly_one().ok()?; - // ensure there is no output ordering; can this rule be relaxed? - if plan.output_ordering().is_some() { - return None; - } - // ensure no ordering is required on the input - if plan.required_input_ordering()[0].is_some() { - return None; - } - - // if found_match_aggr is true, match_aggr holds a parent aggregation whose group_by - // must match that of a child aggregation in order to rewrite the child aggregation - let mut match_aggr: Arc = plan; - let mut found_match_aggr = false; - - let mut rewrite_applicable = true; - let closure = |plan: Arc| { - if !rewrite_applicable { - return Ok(Transformed::no(plan)); - } - if let Some(aggr) = plan.as_any().downcast_ref::() { - if found_match_aggr { - if let Some(parent_aggr) = - match_aggr.as_any().downcast_ref::() - { - if !parent_aggr.group_expr().eq(aggr.group_expr()) { - // a partial and final aggregation with different groupings disqualifies - // rewriting the child aggregation - rewrite_applicable = false; - return Ok(Transformed::no(plan)); - } - } - } - // either we run into an Aggregate and transform it, or disable the rewrite - // for subsequent children - match Self::transform_agg(aggr, limit) { - None => {} - Some(new_aggr) => { - match_aggr = plan; - found_match_aggr = true; - return Ok(Transformed::yes(new_aggr)); - } - } - } - rewrite_applicable = false; - Ok(Transformed::no(plan)) - }; - let child = child.clone().transform_down(closure).data().ok()?; - if is_global_limit { - return Some(Arc::new(GlobalLimitExec::new( - child, - global_skip, - global_fetch, - ))); - } - Some(Arc::new(LocalLimitExec::new(child, limit))) - } -} - -impl Default for LimitedDistinctAggregation { - fn default() -> Self { - Self::new() - } -} - -impl PhysicalOptimizerRule for LimitedDistinctAggregation { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - if config.optimizer.enable_distinct_aggregation_soft_limit { - plan.transform_down(|plan| { - Ok( - if let Some(plan) = - LimitedDistinctAggregation::transform_limit(plan.clone()) - { - Transformed::yes(plan) - } else { - Transformed::no(plan) - }, - ) - }) - .data() - } else { - Ok(plan) - } - } - - fn name(&self) -> &str { - "LimitedDistinctAggregation" - } - - fn schema_check(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - - use super::*; - use crate::physical_optimizer::enforce_distribution::tests::{ - parquet_exec_with_sort, schema, trim_plan_display, - }; - use crate::physical_plan::aggregates::PhysicalGroupBy; - use crate::physical_plan::collect; - use crate::physical_plan::memory::MemoryExec; - use crate::prelude::SessionContext; - use crate::test_util::TestAggregate; - - use arrow::array::Int32Array; - use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use arrow::util::pretty::pretty_format_batches; - use arrow_schema::SchemaRef; - use datafusion_execution::config::SessionConfig; - use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{cast, col}; - use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr}; - use datafusion_physical_plan::aggregates::AggregateMode; - use datafusion_physical_plan::displayable; - - fn mock_data() -> Result> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])); - - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(Int32Array::from(vec![ - Some(1), - Some(2), - None, - Some(1), - Some(4), - Some(5), - ])), - Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(6), - Some(2), - Some(8), - Some(9), - ])), - ], - )?; - - Ok(Arc::new(MemoryExec::try_new( - &[vec![batch]], - Arc::clone(&schema), - None, - )?)) - } - - fn assert_plan_matches_expected( - plan: &Arc, - expected: &[&str], - ) -> Result<()> { - let expected_lines: Vec<&str> = expected.to_vec(); - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - - let optimized = LimitedDistinctAggregation::new() - .optimize(Arc::clone(plan), state.config_options())?; - - let optimized_result = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&optimized_result); - - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - expected_lines, actual_lines - ); - - Ok(()) - } - - async fn assert_results_match_expected( - plan: Arc, - expected: &str, - ) -> Result<()> { - let cfg = SessionConfig::new().with_target_partitions(1); - let ctx = SessionContext::new_with_config(cfg); - let batches = collect(plan, ctx.task_ctx()).await?; - let actual = format!("{}", pretty_format_batches(&batches)?); - assert_eq!(actual, expected); - Ok(()) - } - - pub fn build_group_by( - input_schema: &SchemaRef, - columns: Vec, - ) -> PhysicalGroupBy { - let mut group_by_expr: Vec<(Arc, String)> = vec![]; - for column in columns.iter() { - group_by_expr.push((col(column, input_schema).unwrap(), column.to_string())); - } - PhysicalGroupBy::new_single(group_by_expr.clone()) - } - - #[tokio::test] - async fn test_partial_final() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Partial/Final AggregateExec - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - Arc::new(partial_agg), /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(final_agg), - 4, // fetch - ); - // expected to push the limit to the Partial and Final AggregateExecs - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[], lim=[4]", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[], lim=[4]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; - Ok(()) - } - - #[tokio::test] - async fn test_single_local() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 4, // fetch - ); - // expected to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; - Ok(()) - } - - #[tokio::test] - async fn test_single_global() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = GlobalLimitExec::new( - Arc::new(single_agg), - 1, // skip - Some(3), // fetch - ); - // expected to push the skip+fetch limit to the AggregateExec - let expected = [ - "GlobalLimitExec: skip=1, fetch=3", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; - Ok(()) - } - - #[tokio::test] - async fn test_distinct_cols_different_than_group_by_cols() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT distinct a FROM MemoryExec GROUP BY a, b LIMIT 4;`, Single/Single AggregateExec - let group_by_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string(), "b".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let distinct_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - Arc::new(group_by_agg), /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(distinct_agg), - 4, // fetch - ); - // expected to push the limit to the outer AggregateExec only - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; - Ok(()) - } - - #[test] - fn test_no_group_by() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec![]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[], aggr=[]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } - - #[test] - fn test_has_aggregate_expression() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![agg.count_expr(&schema)], /* aggr_expr */ - vec![None], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } - - #[test] - fn test_has_filter() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - - // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec - // the `a > 1` filter is applied in the AggregateExec - let filter_expr = Some(expressions::binary( - expressions::col("a", &schema)?, - Operator::Gt, - cast(expressions::lit(1u32), &schema, DataType::Int32)?, - &schema, - )?); - let agg = TestAggregate::new_count_star(); - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![agg.count_expr(&schema)], /* aggr_expr */ - vec![filter_expr], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } - - #[test] - fn test_has_order_by() -> Result<()> { - let sort_key = vec![PhysicalSortExpr { - expr: expressions::col("a", &schema()).unwrap(), - options: SortOptions::default(), - }]; - let source = parquet_exec_with_sort(vec![sort_key]); - let schema = source.schema(); - - // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec - // the `a > 1` filter is applied in the AggregateExec - let single_agg = AggregateExec::try_new( - AggregateMode::Single, - build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![], /* filter_expr */ - source, /* input */ - schema.clone(), /* input_schema */ - )?; - let limit_exec = LocalLimitExec::new( - Arc::new(single_agg), - 10, // fetch - ); - // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], ordering_mode=Sorted", - "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", - ]; - let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - Ok(()) - } -} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 0e68a05d855c..c32c77043f15 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -26,7 +26,6 @@ pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; -pub mod limited_distinct_aggregation; pub mod optimizer; pub mod projection_pushdown; pub mod pruning; diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs new file mode 100644 index 000000000000..d7efc4ffdc7a --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the limited distinct aggregation optimizer rule + +use std::sync::Arc; + +use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; +use datafusion_physical_plan::aggregates::AggregateExec; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::ExecutionPlan; + +use datafusion_common::Result; + +use datafusion_physical_optimizer::PhysicalOptimizerRule; + +use datafusion::prelude::SessionContext; +use datafusion::test_util::TestAggregate; +use datafusion_physical_plan::aggregates::PhysicalGroupBy; +use datafusion_physical_plan::collect; +use datafusion_physical_plan::memory::MemoryExec; + +use arrow::array::Int32Array; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::SchemaRef; +use datafusion_execution::config::SessionConfig; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{cast, col}; +use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_plan::aggregates::AggregateMode; +use datafusion_physical_plan::displayable; + +use crate::physical_optimizer::test_util::trim_plan_display; + +use super::test_util::{parquet_exec_with_sort, schema}; + +fn mock_data() -> Result> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + None, + Some(1), + Some(4), + Some(5), + ])), + Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(6), + Some(2), + Some(8), + Some(9), + ])), + ], + )?; + + Ok(Arc::new(MemoryExec::try_new( + &[vec![batch]], + Arc::clone(&schema), + None, + )?)) +} + +fn assert_plan_matches_expected( + plan: &Arc, + expected: &[&str], +) -> Result<()> { + let expected_lines: Vec<&str> = expected.to_vec(); + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + let optimized = LimitedDistinctAggregation::new() + .optimize(Arc::clone(plan), state.config_options())?; + + let optimized_result = displayable(optimized.as_ref()).indent(true).to_string(); + let actual_lines = trim_plan_display(&optimized_result); + + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + + Ok(()) +} + +async fn assert_results_match_expected( + plan: Arc, + expected: &str, +) -> Result<()> { + let cfg = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(cfg); + let batches = collect(plan, ctx.task_ctx()).await?; + let actual = format!("{}", pretty_format_batches(&batches)?); + assert_eq!(actual, expected); + Ok(()) +} + +pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> PhysicalGroupBy { + let mut group_by_expr: Vec<(Arc, String)> = vec![]; + for column in columns.iter() { + group_by_expr.push((col(column, input_schema).unwrap(), column.to_string())); + } + PhysicalGroupBy::new_single(group_by_expr.clone()) +} + +#[tokio::test] +async fn test_partial_final() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Partial/Final AggregateExec + let partial_agg = AggregateExec::try_new( + AggregateMode::Partial, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + Arc::new(partial_agg), /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(final_agg), + 4, // fetch + ); + // expected to push the limit to the Partial and Final AggregateExecs + let expected = [ + "LocalLimitExec: fetch=4", + "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[], lim=[4]", + "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[], lim=[4]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + let expected = r#" ++---+ +| a | ++---+ +| 1 | +| 2 | +| | +| 4 | ++---+ +"# + .trim(); + assert_results_match_expected(plan, expected).await?; + Ok(()) +} + +#[tokio::test] +async fn test_single_local() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 4, // fetch + ); + // expected to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=4", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + let expected = r#" ++---+ +| a | ++---+ +| 1 | +| 2 | +| | +| 4 | ++---+ +"# + .trim(); + assert_results_match_expected(plan, expected).await?; + Ok(()) +} + +#[tokio::test] +async fn test_single_global() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec GROUP BY a LIMIT 4;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = GlobalLimitExec::new( + Arc::new(single_agg), + 1, // skip + Some(3), // fetch + ); + // expected to push the skip+fetch limit to the AggregateExec + let expected = [ + "GlobalLimitExec: skip=1, fetch=3", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + let expected = r#" ++---+ +| a | ++---+ +| 2 | +| | +| 4 | ++---+ +"# + .trim(); + assert_results_match_expected(plan, expected).await?; + Ok(()) +} + +#[tokio::test] +async fn test_distinct_cols_different_than_group_by_cols() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT distinct a FROM MemoryExec GROUP BY a, b LIMIT 4;`, Single/Single AggregateExec + let group_by_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string(), "b".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let distinct_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + Arc::new(group_by_agg), /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(distinct_agg), + 4, // fetch + ); + // expected to push the limit to the outer AggregateExec only + let expected = [ + "LocalLimitExec: fetch=4", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", + "AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + let expected = r#" ++---+ +| a | ++---+ +| 1 | +| 2 | +| | +| 4 | ++---+ +"# + .trim(); + assert_results_match_expected(plan, expected).await?; + Ok(()) +} + +#[test] +fn test_no_group_by() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec![]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[], aggr=[]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} + +#[test] +fn test_has_aggregate_expression() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + let agg = TestAggregate::new_count_star(); + + // `SELECT FROM MemoryExec LIMIT 10;`, Single AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![agg.count_expr(&schema)], /* aggr_expr */ + vec![None], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} + +#[test] +fn test_has_filter() -> Result<()> { + let source = mock_data()?; + let schema = source.schema(); + + // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec + // the `a > 1` filter is applied in the AggregateExec + let filter_expr = Some(expressions::binary( + expressions::col("a", &schema)?, + Operator::Gt, + cast(expressions::lit(1u32), &schema, DataType::Int32)?, + &schema, + )?); + let agg = TestAggregate::new_count_star(); + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![agg.count_expr(&schema)], /* aggr_expr */ + vec![filter_expr], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", + "MemoryExec: partitions=1, partition_sizes=[1]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} + +#[test] +fn test_has_order_by() -> Result<()> { + let sort_key = vec![PhysicalSortExpr { + expr: expressions::col("a", &schema()).unwrap(), + options: SortOptions::default(), + }]; + let source = parquet_exec_with_sort(vec![sort_key]); + let schema = source.schema(); + + // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec + // the `a > 1` filter is applied in the AggregateExec + let single_agg = AggregateExec::try_new( + AggregateMode::Single, + build_group_by(&schema.clone(), vec!["a".to_string()]), + vec![], /* aggr_expr */ + vec![], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ + )?; + let limit_exec = LocalLimitExec::new( + Arc::new(single_agg), + 10, // fetch + ); + // expected not to push the limit to the AggregateExec + let expected = [ + "LocalLimitExec: fetch=10", + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], ordering_mode=Sorted", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + let plan: Arc = Arc::new(limit_exec); + assert_plan_matches_expected(&plan, &expected)?; + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 904a8b9fbb38..149103cf3482 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,3 +17,5 @@ mod aggregate_statistics; mod limit_pushdown; +mod limited_distinct_aggregation; +mod test_util; diff --git a/datafusion/core/tests/physical_optimizer/test_util.rs b/datafusion/core/tests/physical_optimizer/test_util.rs new file mode 100644 index 000000000000..131b887c4ec7 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/test_util.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test utilities for physical optimizer tests + +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::datasource::{ + listing::PartitionedFile, + physical_plan::{FileScanConfig, ParquetExec}, +}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_expr::PhysicalSortExpr; + +/// create a single parquet file that is sorted +pub(crate) fn parquet_exec_with_sort( + output_ordering: Vec>, +) -> Arc { + ParquetExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering), + ) + .build_arc() +} + +pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Boolean, true), + ])) +} + +pub(crate) fn trim_plan_display(plan: &str) -> Vec<&str> { + plan.split('\n') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect() +} diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 125ea6acc77f..a7f675b37a60 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -36,3 +36,4 @@ datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +itertools = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index d54e6dbcab8f..caebdcc927ae 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -19,6 +19,7 @@ pub mod aggregate_statistics; pub mod limit_pushdown; +pub mod limited_distinct_aggregation; mod optimizer; pub mod output_requirements; diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs new file mode 100644 index 000000000000..f8c33136736c --- /dev/null +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A special-case optimizer rule that pushes limit into a grouped aggregation +//! which has no aggregate expressions or sorting requirements + +use std::sync::Arc; + +use datafusion_physical_plan::aggregates::AggregateExec; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::Result; + +use crate::PhysicalOptimizerRule; +use itertools::Itertools; + +/// An optimizer rule that passes a `limit` hint into grouped aggregations which don't require all +/// rows in the group to be processed for correctness. Example queries fitting this description are: +/// `SELECT distinct l_orderkey FROM lineitem LIMIT 10;` +/// `SELECT l_orderkey FROM lineitem GROUP BY l_orderkey LIMIT 10;` +pub struct LimitedDistinctAggregation {} + +impl LimitedDistinctAggregation { + /// Create a new `LimitedDistinctAggregation` + pub fn new() -> Self { + Self {} + } + + fn transform_agg( + aggr: &AggregateExec, + limit: usize, + ) -> Option> { + // rules for transforming this Aggregate are held in this method + if !aggr.is_unordered_unfiltered_group_by_distinct() { + return None; + } + + // We found what we want: clone, copy the limit down, and return modified node + let new_aggr = AggregateExec::try_new( + *aggr.mode(), + aggr.group_expr().clone(), + aggr.aggr_expr().to_vec(), + aggr.filter_expr().to_vec(), + aggr.input().clone(), + aggr.input_schema(), + ) + .expect("Unable to copy Aggregate!") + .with_limit(Some(limit)); + Some(Arc::new(new_aggr)) + } + + /// transform_limit matches an `AggregateExec` as the child of a `LocalLimitExec` + /// or `GlobalLimitExec` and pushes the limit into the aggregation as a soft limit when + /// there is a group by, but no sorting, no aggregate expressions, and no filters in the + /// aggregation + fn transform_limit(plan: Arc) -> Option> { + let limit: usize; + let mut global_fetch: Option = None; + let mut global_skip: usize = 0; + let children: Vec>; + let mut is_global_limit = false; + if let Some(local_limit) = plan.as_any().downcast_ref::() { + limit = local_limit.fetch(); + children = local_limit.children().into_iter().cloned().collect(); + } else if let Some(global_limit) = plan.as_any().downcast_ref::() + { + global_fetch = global_limit.fetch(); + global_fetch?; + global_skip = global_limit.skip(); + // the aggregate must read at least fetch+skip number of rows + limit = global_fetch.unwrap() + global_skip; + children = global_limit.children().into_iter().cloned().collect(); + is_global_limit = true + } else { + return None; + } + let child = children.iter().exactly_one().ok()?; + // ensure there is no output ordering; can this rule be relaxed? + if plan.output_ordering().is_some() { + return None; + } + // ensure no ordering is required on the input + if plan.required_input_ordering()[0].is_some() { + return None; + } + + // if found_match_aggr is true, match_aggr holds a parent aggregation whose group_by + // must match that of a child aggregation in order to rewrite the child aggregation + let mut match_aggr: Arc = plan; + let mut found_match_aggr = false; + + let mut rewrite_applicable = true; + let closure = |plan: Arc| { + if !rewrite_applicable { + return Ok(Transformed::no(plan)); + } + if let Some(aggr) = plan.as_any().downcast_ref::() { + if found_match_aggr { + if let Some(parent_aggr) = + match_aggr.as_any().downcast_ref::() + { + if !parent_aggr.group_expr().eq(aggr.group_expr()) { + // a partial and final aggregation with different groupings disqualifies + // rewriting the child aggregation + rewrite_applicable = false; + return Ok(Transformed::no(plan)); + } + } + } + // either we run into an Aggregate and transform it, or disable the rewrite + // for subsequent children + match Self::transform_agg(aggr, limit) { + None => {} + Some(new_aggr) => { + match_aggr = plan; + found_match_aggr = true; + return Ok(Transformed::yes(new_aggr)); + } + } + } + rewrite_applicable = false; + Ok(Transformed::no(plan)) + }; + let child = child.clone().transform_down(closure).data().ok()?; + if is_global_limit { + return Some(Arc::new(GlobalLimitExec::new( + child, + global_skip, + global_fetch, + ))); + } + Some(Arc::new(LocalLimitExec::new(child, limit))) + } +} + +impl Default for LimitedDistinctAggregation { + fn default() -> Self { + Self::new() + } +} + +impl PhysicalOptimizerRule for LimitedDistinctAggregation { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if config.optimizer.enable_distinct_aggregation_soft_limit { + plan.transform_down(|plan| { + Ok( + if let Some(plan) = + LimitedDistinctAggregation::transform_limit(plan.clone()) + { + Transformed::yes(plan) + } else { + Transformed::no(plan) + }, + ) + }) + .data() + } else { + Ok(plan) + } + } + + fn name(&self) -> &str { + "LimitedDistinctAggregation" + } + + fn schema_check(&self) -> bool { + true + } +} From e27b5242ea9bd173d61743ec068d46784245ebe0 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 16 Aug 2024 22:48:58 +0800 Subject: [PATCH 2/5] chore: Update cargo.lock --- datafusion-cli/Cargo.lock | 89 +++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 90995c1d116a..22f889de804b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -272,7 +272,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.3.0", + "indexmap 2.4.0", "lexical-core", "num", "serde", @@ -875,12 +875,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.10" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9e8aabfac534be767c909e0690571677d49f41bd8465ae876fe043d52ba5292" +checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -1163,7 +1164,7 @@ dependencies = [ "glob", "half", "hashbrown 0.14.5", - "indexmap 2.3.0", + "indexmap 2.4.0", "itertools 0.12.1", "log", "num-traits", @@ -1386,7 +1387,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.5", - "indexmap 2.3.0", + "indexmap 2.4.0", "itertools 0.12.1", "log", "paste", @@ -1415,7 +1416,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "hex", - "indexmap 2.3.0", + "indexmap 2.4.0", "itertools 0.12.1", "log", "paste", @@ -1457,6 +1458,7 @@ dependencies = [ "datafusion-execution", "datafusion-physical-expr", "datafusion-physical-plan", + "itertools 0.12.1", ] [[package]] @@ -1483,7 +1485,7 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "indexmap 2.3.0", + "indexmap 2.4.0", "itertools 0.12.1", "log", "once_cell", @@ -1848,7 +1850,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.3.0", + "indexmap 2.4.0", "slab", "tokio", "tokio-util", @@ -1867,7 +1869,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.3.0", + "indexmap 2.4.0", "slab", "tokio", "tokio-util", @@ -2159,9 +2161,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -2226,9 +2228,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" dependencies = [ "wasm-bindgen", ] @@ -2305,9 +2307,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.156" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libflate" @@ -2729,7 +2731,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.3.0", + "indexmap 2.4.0", ] [[package]] @@ -3388,18 +3390,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.207" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.207" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", @@ -3408,9 +3410,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.124" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", "memchr", @@ -3441,6 +3443,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -3847,15 +3855,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -4057,19 +4065,20 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ "cfg-if", + "once_cell", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" dependencies = [ "bumpalo", "log", @@ -4082,9 +4091,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.42" +version = "0.4.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" dependencies = [ "cfg-if", "js-sys", @@ -4094,9 +4103,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4104,9 +4113,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", @@ -4117,9 +4126,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "wasm-streams" @@ -4136,9 +4145,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" dependencies = [ "js-sys", "wasm-bindgen", From 48534244708a7265913d282f8ed4256e5a6a3d66 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 16 Aug 2024 23:42:13 +0800 Subject: [PATCH 3/5] chore: Fix clippy --- .../physical-optimizer/src/limited_distinct_aggregation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index f8c33136736c..28fc02ed0c61 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -58,7 +58,7 @@ impl LimitedDistinctAggregation { aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), - aggr.input().clone(), + aggr.input().to_owned(), aggr.input_schema(), ) .expect("Unable to copy Aggregate!") @@ -138,7 +138,7 @@ impl LimitedDistinctAggregation { rewrite_applicable = false; Ok(Transformed::no(plan)) }; - let child = child.clone().transform_down(closure).data().ok()?; + let child = child.to_owned().transform_down(closure).data().ok()?; if is_global_limit { return Some(Arc::new(GlobalLimitExec::new( child, @@ -166,7 +166,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { plan.transform_down(|plan| { Ok( if let Some(plan) = - LimitedDistinctAggregation::transform_limit(plan.clone()) + LimitedDistinctAggregation::transform_limit(plan.to_owned()) { Transformed::yes(plan) } else { From ae9fbb877c2c97a80911a73424c88b14e240ee67 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Sat, 17 Aug 2024 11:28:50 +0800 Subject: [PATCH 4/5] Update datafusion/physical-optimizer/src/limited_distinct_aggregation.rs Co-authored-by: Andrew Lamb --- .../physical-optimizer/src/limited_distinct_aggregation.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index 28fc02ed0c61..e18e530072db 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -188,3 +188,5 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { true } } + +// See tests in datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs From fddb5261ce976838189a4b3fd37831904d2550b9 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 17 Aug 2024 11:31:58 +0800 Subject: [PATCH 5/5] chore: Clean import --- .../limited_distinct_aggregation.rs | 53 +++++++++---------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index d7efc4ffdc7a..48389b0304f6 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -17,39 +17,36 @@ //! Tests for the limited distinct aggregation optimizer rule -use std::sync::Arc; - -use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; -use datafusion_physical_plan::aggregates::AggregateExec; -use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion_physical_plan::ExecutionPlan; - -use datafusion_common::Result; - -use datafusion_physical_optimizer::PhysicalOptimizerRule; +use super::test_util::{parquet_exec_with_sort, schema, trim_plan_display}; -use datafusion::prelude::SessionContext; -use datafusion::test_util::TestAggregate; -use datafusion_physical_plan::aggregates::PhysicalGroupBy; -use datafusion_physical_plan::collect; -use datafusion_physical_plan::memory::MemoryExec; +use std::sync::Arc; -use arrow::array::Int32Array; -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; -use arrow::util::pretty::pretty_format_batches; +use arrow::{ + array::Int32Array, + compute::SortOptions, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + util::pretty::pretty_format_batches, +}; use arrow_schema::SchemaRef; +use datafusion::{prelude::SessionContext, test_util::TestAggregate}; +use datafusion_common::Result; use datafusion_execution::config::SessionConfig; use datafusion_expr::Operator; -use datafusion_physical_expr::expressions::{cast, col}; -use datafusion_physical_expr::{expressions, PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_plan::aggregates::AggregateMode; -use datafusion_physical_plan::displayable; - -use crate::physical_optimizer::test_util::trim_plan_display; - -use super::test_util::{parquet_exec_with_sort, schema}; +use datafusion_physical_expr::{ + expressions::{cast, col}, + PhysicalExpr, PhysicalSortExpr, +}; +use datafusion_physical_optimizer::{ + limited_distinct_aggregation::LimitedDistinctAggregation, PhysicalOptimizerRule, +}; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, + collect, displayable, expressions, + limit::{GlobalLimitExec, LocalLimitExec}, + memory::MemoryExec, + ExecutionPlan, +}; fn mock_data() -> Result> { let schema = Arc::new(Schema::new(vec![