Skip to content

Stop copying LogicalPlan and Exprs in ReplaceDistinctWithAggregate #10460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp};
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::{Column, Result};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Column, Result};
use datafusion_expr::expr_rewriter::normalize_cols;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{
aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
Expand Down Expand Up @@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate {
}

impl OptimizerRule for ReplaceDistinctWithAggregate {
fn try_optimize(
fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: &LogicalPlan,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
let group_expr = expand_wildcard(input.schema(), &input, None)?;
let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new(
input,
group_expr,
vec![],
)?);
Ok(Some(aggregate))
Ok(Transformed::yes(aggr_plan))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
select_expr,
Expand All @@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
input,
schema,
})) => {
let expr_cnt = on_expr.len();

// Construct the aggregation expression to be used to fetch the selected expressions.
let aggr_expr = select_expr
.iter()
.into_iter()
.map(|e| {
Expr::AggregateFunction(AggregateFunction::new(
AggregateFunctionFunc::FirstValue,
vec![e.clone()],
vec![e],
false,
None,
sort_expr.clone(),
Expand All @@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
})
.collect::<Vec<Expr>>();

let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
let group_expr = normalize_cols(on_expr, input.as_ref())?;
Comment on lines +114 to +115
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe unnecessary?


// Build the aggregation plan
let plan = LogicalPlanBuilder::from(input.as_ref().clone())
.aggregate(on_expr.clone(), aggr_expr.to_vec())?
.build()?;
let plan = LogicalPlan::Aggregate(Aggregate::try_new(
input, group_expr, aggr_expr,
)?);
// TODO use LogicalPlanBuilder directly rather than recreating the Aggregate
// when https://github.com/apache/datafusion/issues/10485 is available
let lpb = LogicalPlanBuilder::from(plan);

let plan = if let Some(sort_expr) = sort_expr {
let plan = if let Some(mut sort_expr) = sort_expr {
// While sort expressions were used in the `FIRST_VALUE` aggregation itself above,
// this on it's own isn't enough to guarantee the proper output order of the grouping
// (`ON`) expression, so we need to sort those as well.
LogicalPlanBuilder::from(plan)
.sort(sort_expr[..on_expr.len()].to_vec())?
.build()?

// truncate the sort_expr to the length of on_expr
sort_expr.truncate(expr_cnt);

lpb.sort(sort_expr)?.build()?
} else {
plan
lpb.build()?
};

// Whereas the aggregation plan by default outputs both the grouping and the aggregation
// expressions, for `DISTINCT ON` we only need to emit the original selection expressions.

let project_exprs = plan
.schema()
.iter()
.skip(on_expr.len())
.skip(expr_cnt)
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
Ok(col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name()))
col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name())
})
.collect::<Result<Vec<Expr>>>()?;
.collect::<Vec<Expr>>();

let plan = LogicalPlanBuilder::from(plan)
.project(project_exprs)?
.build()?;

Ok(Some(plan))
Ok(Transformed::yes(plan))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called ReplaceDistinctWithAggregate::rewrite")
}

fn name(&self) -> &str {
"replace_distinct_aggregate"
}
Expand Down