Skip to content

Commit c7f10e1

Browse files
committed
cleanup
Signed-off-by: jayzhan211 <[email protected]>
1 parent d9b0cff commit c7f10e1

File tree

4 files changed

+23
-64
lines changed

4 files changed

+23
-64
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ use datafusion_expr::{
8585
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
8686
WindowFrame, WindowFrameBound, WriteOp,
8787
};
88-
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
8988
use datafusion_physical_expr::expressions::Literal;
9089
use datafusion_physical_expr::LexOrdering;
9190
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -1843,9 +1842,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
18431842
// TODO: Remove this after array_agg are all udafs
18441843
let (agg_expr, filter, order_by) = match func_def {
18451844
AggregateFunctionDefinition::UDF(udf)
1846-
if udf.name() == "array_agg" && !distinct && order_by.is_none() =>
1845+
if udf.name() == "ARRAY_AGG" && (*distinct || order_by.is_some()) =>
18471846
{
1848-
let sort_exprs = order_by.clone().unwrap_or(vec![]);
18491847
let physical_sort_exprs = match order_by {
18501848
Some(exprs) => Some(create_physical_sort_exprs(
18511849
exprs,
@@ -1856,16 +1854,15 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
18561854
};
18571855
let ordering_reqs: Vec<PhysicalSortExpr> =
18581856
physical_sort_exprs.clone().unwrap_or(vec![]);
1859-
let agg_expr = udaf::create_aggregate_expr(
1860-
&array_agg_udaf(),
1857+
let fun = aggregates::AggregateFunction::ArrayAgg;
1858+
let agg_expr = aggregates::create_aggregate_expr(
1859+
&fun,
1860+
*distinct,
18611861
&physical_args,
1862-
args,
1863-
&sort_exprs,
18641862
&ordering_reqs,
18651863
physical_input_schema,
18661864
name,
18671865
ignore_nulls,
1868-
*distinct,
18691866
)?;
18701867
(agg_expr, filter, physical_sort_exprs)
18711868
}

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ fn aggregate_batch(
218218
Some(filter) => Cow::Owned(batch_filter(&batch, filter)?),
219219
None => Cow::Borrowed(&batch),
220220
};
221+
println!("expr: {:?}", expr);
221222
// 1.3
222223
let values = &expr
223224
.iter()

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,7 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
244244
let mut distinct = false;
245245

246246
// TODO: remove
247-
let inner = if aggr_expr.downcast_ref::<ArrayAgg>().is_some() {
248-
protobuf::AggregateFunction::ArrayAgg
249-
} else if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
247+
let inner = if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
250248
distinct = true;
251249
protobuf::AggregateFunction::ArrayAgg
252250
} else if aggr_expr.downcast_ref::<OrderSensitiveArrayAgg>().is_some() {

datafusion/sql/src/expr/function.rs

Lines changed: 16 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -334,53 +334,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
334334
return Ok(expr);
335335
}
336336
} else {
337-
// TODO: remove this after all array_agg are udafs
338-
let order_by = self.order_by_to_sort_expr(
339-
&order_by,
340-
schema,
341-
planner_context,
342-
true,
343-
None,
344-
)?;
345-
let order_by = (!order_by.is_empty()).then_some(order_by);
346-
// println!("name: {:?}", name);
347-
// println!("order: {:?}", order_by);
348-
// println!("distinct: {:?}", distinct);
349-
350-
// if name == "array_agg" && (order_by.is_some() || distinct) {
351-
// if let Ok(fun) = AggregateFunction::from_str(&name) {
352-
// let args =
353-
// self.function_args_to_expr(args, schema, planner_context)?;
354-
// let filter: Option<Box<Expr>> = filter
355-
// .map(|e| {
356-
// self.sql_expr_to_logical_expr(*e, schema, planner_context)
357-
// })
358-
// .transpose()?
359-
// .map(Box::new);
360-
361-
// println!("actualk order_by: {:?}", order_by);
362-
363-
// return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
364-
// fun,
365-
// args,
366-
// distinct,
367-
// filter,
368-
// order_by,
369-
// null_treatment,
370-
// )));
371-
// };
372-
// }
373-
374337
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
375338
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
376-
// let order_by = self.order_by_to_sort_expr(
377-
// &order_by,
378-
// schema,
379-
// planner_context,
380-
// true,
381-
// None,
382-
// )?;
383-
// let order_by = (!order_by.is_empty()).then_some(order_by);
339+
let order_by = self.order_by_to_sort_expr(
340+
&order_by,
341+
schema,
342+
planner_context,
343+
true,
344+
None,
345+
)?;
346+
let order_by = (!order_by.is_empty()).then_some(order_by);
384347
let args = self.function_args_to_expr(args, schema, planner_context)?;
385348
let filter: Option<Box<Expr>> = filter
386349
.map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
@@ -398,14 +361,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
398361

399362
// next, aggregate built-ins
400363
if let Ok(fun) = AggregateFunction::from_str(&name) {
401-
// let order_by = self.order_by_to_sort_expr(
402-
// &order_by,
403-
// schema,
404-
// planner_context,
405-
// true,
406-
// None,
407-
// )?;
408-
// let order_by = (!order_by.is_empty()).then_some(order_by);
364+
let order_by = self.order_by_to_sort_expr(
365+
&order_by,
366+
schema,
367+
planner_context,
368+
true,
369+
None,
370+
)?;
371+
let order_by = (!order_by.is_empty()).then_some(order_by);
409372
let args = self.function_args_to_expr(args, schema, planner_context)?;
410373
let filter: Option<Box<Expr>> = filter
411374
.map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))

0 commit comments

Comments
 (0)