Skip to content

feat: add raw aggregate udf planner #11371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl SessionState {
feature = "unicode_expressions"
))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
Arc::new(functions_aggregate::planner::AggregateUDFPlanner),
];

let mut new_self = SessionState {
Expand Down
23 changes: 23 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion_common::{
config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema,
Result, TableReference,
};
use sqlparser::ast::NullTreatment;

use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource, WindowUDF};

Expand Down Expand Up @@ -161,6 +162,28 @@ pub trait ExprPlanner: Send + Sync {
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

/// Plans a `RawAggregateUDF` based on the given input expressions.
///
/// Returns a `PlannerResult` containing either the planned aggregate function or the original
/// input expressions if planning is not possible.
fn plan_aggregate_udf(
&self,
aggregate_function: RawAggregateUDF,
) -> Result<PlannerResult<RawAggregateUDF>> {
Ok(PlannerResult::Original(aggregate_function))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree to convert count() to count(1) in planner. The conversion would be like

    fn plan_aggregate_udf(
        &self,
        aggregate_function: RawAggregateUDF,
    ) -> datafusion_common::Result<PlannerResult<RawAggregateUDF>> {

        let RawAggregateUDF { udf, args, distinct, filter, order_by, null_treatment } = aggregate_function;
        if udf.name() == "count" && args.is_empty() {
            let args = vec![lit(1)];
            let expr = Expr::AggregateFunction(AggregateFunction {func_def: datafusion_expr::expr::AggregateFunctionDefinition::UDF(udf), args, distinct, filter, order_by, null_treatment});
            return Ok(PlannerResult::Planned(expr))
        }

        let aggregate_function = RawAggregateUDF { udf, args, distinct, filter, order_by, null_treatment };
        Ok(PlannerResult::Original(aggregate_function))
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the path is after that to getting from count(1) to count() as we're talking about in #11229 (that also works in .select). I think until there's a clear path to that, I'm hesitant to expand the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the conversion from count(1) to count() is more like an optinization (rather than something for hte sql planner) as it would apply equally to SQL and to dataframe apis

Perhaps we could add it as part of https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.AggregateUDFImpl.html#method.simplify 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm think of dataframe API is that we only provide one count_star() function, so we don't need to deal with function rewrite for dataframe APIs.

For sql, count(), count(1) and count(*) are equivalent things that can all to one single Expr, so I think it is possible to have the conversion in planner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sql, count(), count(1) and count(*) are equivalent things that can all to one single Expr, so I think it is possible to have the conversion in planner.

I agree they are equivalent things. But I am thinking it may be hard to teach everyone who builds Exprs to call count_star rather than count(lit(1)) or count(Expr::Wildcard)

To be clear I don't have a massively strong opinion, but it seems like since we don't control people's creation of Expr we aren't going to be able to prevent stuff like count(1)

Copy link
Contributor

@jayzhan211 jayzhan211 Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite could be simple enough that we don't need to push down to optimizer.

pub fn count_star() -> Expr {
    Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
        count_udaf(),
        vec![Expr::Literal(COUNT_STAR_EXPANSION)],
        false,
        None,
        None,
        None,
    ))
}

pub fn count(expr: Expr) -> Expr {
    // For backward compatility, could use count_star instead
    if let Expr::Wildcard { qualifier: _ } = expr {
        count_star()
    } else {
        Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
            count_udaf(),
            vec![expr],
            false,
            None,
            None,
            None,
        ))
    }
}

I think we need a clear role between expression rewrite in planner vs rewrite in optimizer.
ExprPlanner is the first spot we get the expressions from SQLExpr and build up datafusion::Expr. For syntax rewrite like dict {a: b}, compound id a.b or equivalent args rewrite count(*) are all good candidate to determine the expected datafusion::Expr in ExprPlanner. Others should push down to optimizer since they may need const evaluation or multiple passes.

If the reason to rewrite expression in optimizer is because that benefit both sql and dataframe API, maybe redesign dataframe API is what we need 🤔 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a clear role between expression rewrite in planner vs rewrite in optimizer. ExprPlanner is the first spot we get the expressions from SQLExpr and build up datafusion::Expr. For syntax rewrite like dict {a: b}, compound id a.b or equivalent args rewrite count(*) are all good candidate to determine the expected datafusion::Expr in ExprPlanner. Others should push down to optimizer since they may need const evaluation or multiple passes.

If the reason to rewrite expression in optimizer is because that benefit both sql and dataframe API, maybe redesign dataframe API is what we need 🤔 ?

I guess in my mind the distinction is

  1. the planner is a mechanical transformation to something that could be run
  2. the optimizer rewrites expressions / plans to keep the exact same semantics (output results) but faster.

So in my mind the transformation of count(1) --> count() is an optimization as it potentially avoids having to accumulate a known column in the execution. Maybe there is a better way to think about it

This might be a good distinction to clarify in the documentation 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in my mind the distinction is
the planner is a mechanical transformation to something that could be run
the optimizer rewrites expressions / plans to keep the exact same semantics (output results) but faster.

I agree with this. Rewrite in logical optimizer results to the better one.

So in my mind the transformation of count(1) --> count() is an optimization as it potentially avoids having to accumulate a known column in the execution. Maybe there is a better way to think about it

In the case of count, count(*), count(1), and count() are equivalent things, not one is faster than the other.

The optimization that counts the row number is not in the logical optimizer but in the physical optimizer. Therefore, the transformation from count() and count(*) to count(1) is not close to the second point IMO. What we are doing before the physical optimizer is to transform the equivalent expr to a single one (count(1)), so we just need to process that one in the physical optimizer. Note that we could process those 3 expressions individually in the physical optimizer, but standardizing to a single expression as early as possible might reduce the complexity of the downstream query processing. It is more like rewriting expressions to the equivalent expression to me.

This might be a good distinction to clarify in the documentation 🤔

It is nice to do so, we should clarify the role between these two.

}
}

// An `AggregateUDF` to be planned.
#[derive(Debug, Clone)]
pub struct RawAggregateUDF {
Copy link
Contributor Author

@tshauck tshauck Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought I had during this is it might be nice to also update the naming pattern for these intermediate structs... maybe from Raw* to Plannable*?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that Plannable sounds better (perhaps we can propose the change as a follow on PR)?

What do you think @samuelcolvin and @jayzhan211 ?

pub udf: Arc<crate::AggregateUDF>,
pub args: Vec<Expr>,
pub distinct: bool,
pub filter: Option<Box<Expr>>,
pub order_by: Option<Vec<Expr>>,
pub null_treatment: Option<NullTreatment>,
}

/// An operator with two arguments to plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod covariance;
pub mod first_last;
pub mod hyperloglog;
pub mod median;
pub mod planner;
pub mod regr;
pub mod stddev;
pub mod sum;
Expand Down
29 changes: 29 additions & 0 deletions datafusion/functions-aggregate/src/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawAggregateUDF};

pub struct AggregateUDFPlanner;

impl ExprPlanner for AggregateUDFPlanner {
fn plan_aggregate_udf(
&self,
aggregate_function: RawAggregateUDF,
) -> datafusion_common::Result<PlannerResult<RawAggregateUDF>> {
Ok(PlannerResult::Original(aggregate_function))
}
}
25 changes: 22 additions & 3 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion_common::{
internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema,
Dependency, Result,
};
use datafusion_expr::planner::{PlannerResult, RawAggregateUDF};
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
expr, AggregateFunction, Expr, ExprSchemable, WindowFrame, WindowFunctionDefinition,
Expand Down Expand Up @@ -335,7 +336,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
if let Some(udf) = self.context_provider.get_aggregate_meta(&name) {
let order_by = self.order_by_to_sort_expr(
&order_by,
schema,
Expand All @@ -349,13 +350,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
.transpose()?
.map(Box::new);
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
fm,

let raw_aggregate_function = RawAggregateUDF {
udf,
args,
distinct,
filter,
order_by,
null_treatment,
};

for planner in self.planners.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that any planner extensions would have precidence over the built in aggregate functions from the Registry. I think that is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid clone like this

for planner in self.planners.iter() {
                    match planner.plan_aggregate_udf(raw_aggregate_function)? {
                        PlannerResult::Planned(e) => return Ok(e),
                        PlannerResult::Original(e) => {
                            raw_aggregate_function = e;
                        }
                    }
                }

                return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
                    raw_aggregate_function.udf,
                    raw_aggregate_function.args,
                    raw_aggregate_function.distinct,
                    raw_aggregate_function.filter,
                    raw_aggregate_function.order_by,
                    raw_aggregate_function.null_treatment,
                )));

if let PlannerResult::Planned(aggregate_function) =
planner.plan_aggregate_udf(raw_aggregate_function.clone())?
{
return Ok(aggregate_function);
}
}

return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
raw_aggregate_function.udf,
raw_aggregate_function.args,
raw_aggregate_function.distinct,
raw_aggregate_function.filter,
raw_aggregate_function.order_by,
raw_aggregate_function.null_treatment,
)));
}

Expand Down
Loading