Skip to content

Commit 48a28af

Browse files
mertak-synnadaberkaysynnadaozankabak
authored
Feature: AggregateMonotonicity (#14271)
* add monotonic function definitions for aggregate expressions * fix benchmark results * set prefer_existing_sort to true in sqllogictests * set prefer_existing_sort to true in sqllogictests * fix typo * re-add test_utils.rs changes to the new file * clone input with Arc * inject aggr expr indices separate stubs and count_udafs * remove redundant file * add Sum monotonicity change monotonicity to return an Enum rather than Option<bool> fix indices re-add monotonicity tests * fix sql logic tests * fix sql logic tests * update docs * review part 1 * fix the tests * revert slt's * simplify terms * Update mod.rs * remove unnecessary computations * remove index calc * Update mod.rs * Apply suggestions from code review * add slt * remove aggregate changes, tests already give expected results * fix clippy * remove one row sorts * Improve comments * Use a short name for set monotonicity --------- Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent 53728b3 commit 48a28af

File tree

19 files changed

+581
-73
lines changed

19 files changed

+581
-73
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 209 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
use std::sync::Arc;
1919

2020
use crate::physical_optimizer::test_utils::{
21-
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
21+
aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
22+
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
2223
coalesce_partitions_exec, create_test_schema, create_test_schema2,
23-
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
24-
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
25-
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
26-
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
24+
create_test_schema3, create_test_schema4, filter_exec, global_limit_exec,
25+
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
26+
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
27+
sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec,
28+
RequirementsTestExec,
2729
};
2830

2931
use datafusion_physical_plan::displayable;
@@ -238,6 +240,208 @@ async fn test_remove_unnecessary_sort5() -> Result<()> {
238240
Ok(())
239241
}
240242

243+
#[tokio::test]
244+
async fn test_bounded_window_set_monotonic_no_partition() -> Result<()> {
245+
let schema = create_test_schema()?;
246+
247+
let source = parquet_exec_sorted(&schema, vec![]);
248+
249+
let sort_exprs = vec![sort_expr_options(
250+
"nullable_col",
251+
&schema,
252+
SortOptions {
253+
descending: true,
254+
nulls_first: false,
255+
},
256+
)];
257+
let sort = sort_exec(sort_exprs.clone(), source);
258+
259+
let bounded_window = bounded_window_exec("nullable_col", vec![], sort);
260+
261+
let output_schema = bounded_window.schema();
262+
let sort_exprs2 = vec![sort_expr_options(
263+
"count",
264+
&output_schema,
265+
SortOptions {
266+
descending: false,
267+
nulls_first: false,
268+
},
269+
)];
270+
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);
271+
272+
let expected_input = [
273+
"SortExec: expr=[count@2 ASC NULLS LAST], preserve_partitioning=[false]",
274+
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
275+
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
276+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
277+
];
278+
let expected_optimized = [
279+
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
280+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
281+
];
282+
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
283+
284+
Ok(())
285+
}
286+
287+
#[tokio::test]
288+
async fn test_bounded_plain_window_set_monotonic_with_partitions() -> Result<()> {
289+
let schema = create_test_schema()?;
290+
291+
let source = parquet_exec_sorted(&schema, vec![]);
292+
293+
let sort_exprs = vec![sort_expr_options(
294+
"nullable_col",
295+
&schema,
296+
SortOptions {
297+
descending: true,
298+
nulls_first: false,
299+
},
300+
)];
301+
let sort = sort_exec(sort_exprs.clone(), source);
302+
303+
let partition_bys = &[col("nullable_col", &schema)?];
304+
let bounded_window = bounded_window_exec_with_partition(
305+
"non_nullable_col",
306+
vec![],
307+
partition_bys,
308+
sort,
309+
false,
310+
);
311+
312+
let output_schema = bounded_window.schema();
313+
let sort_exprs2 = vec![sort_expr_options(
314+
"count",
315+
&output_schema,
316+
SortOptions {
317+
descending: false,
318+
nulls_first: false,
319+
},
320+
)];
321+
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);
322+
323+
let expected_input = [
324+
"SortExec: expr=[count@2 ASC NULLS LAST], preserve_partitioning=[false]",
325+
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
326+
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
327+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
328+
];
329+
let expected_optimized = [
330+
"SortExec: expr=[count@2 ASC NULLS LAST], preserve_partitioning=[false]",
331+
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
332+
" SortExec: expr=[nullable_col@0 ASC NULLS LAST], preserve_partitioning=[false]",
333+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
334+
];
335+
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
336+
337+
Ok(())
338+
}
339+
340+
#[tokio::test]
341+
async fn test_bounded_plain_window_set_monotonic_with_partitions_partial() -> Result<()> {
342+
let schema = create_test_schema()?;
343+
344+
let source = parquet_exec_sorted(&schema, vec![]);
345+
346+
let sort_exprs = vec![sort_expr_options(
347+
"nullable_col",
348+
&schema,
349+
SortOptions {
350+
descending: true,
351+
nulls_first: false,
352+
},
353+
)];
354+
let sort = sort_exec(sort_exprs.clone(), source);
355+
356+
let partition_bys = &[col("nullable_col", &schema)?];
357+
let bounded_window = bounded_window_exec_with_partition(
358+
"non_nullable_col",
359+
vec![],
360+
partition_bys,
361+
sort,
362+
false,
363+
);
364+
365+
let output_schema = bounded_window.schema();
366+
let sort_exprs2 = vec![
367+
sort_expr_options(
368+
"nullable_col",
369+
&output_schema,
370+
SortOptions {
371+
descending: true,
372+
nulls_first: false,
373+
},
374+
),
375+
sort_expr_options(
376+
"count",
377+
&output_schema,
378+
SortOptions {
379+
descending: false,
380+
nulls_first: false,
381+
},
382+
),
383+
];
384+
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);
385+
386+
let expected_input = [
387+
"SortExec: expr=[nullable_col@0 DESC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
388+
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
389+
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
390+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
391+
];
392+
let expected_optimized = [
393+
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
394+
" SortExec: expr=[nullable_col@0 DESC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]",
395+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
396+
];
397+
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
398+
399+
Ok(())
400+
}
401+
402+
#[tokio::test]
403+
async fn test_bounded_window_non_set_monotonic_sort() -> Result<()> {
404+
let schema = create_test_schema4()?;
405+
let sort_exprs = vec![sort_expr_options(
406+
"a",
407+
&schema,
408+
SortOptions {
409+
descending: true,
410+
nulls_first: false,
411+
},
412+
)];
413+
let source = parquet_exec_sorted(&schema, sort_exprs.clone());
414+
let sort = sort_exec(sort_exprs.clone(), source);
415+
416+
let bounded_window =
417+
bounded_window_exec_non_set_monotonic("a", sort_exprs.clone(), sort);
418+
let output_schema = bounded_window.schema();
419+
let sort_exprs2 = vec![sort_expr_options(
420+
"avg",
421+
&output_schema,
422+
SortOptions {
423+
descending: false,
424+
nulls_first: false,
425+
},
426+
)];
427+
let physical_plan = sort_exec(sort_exprs2.clone(), bounded_window);
428+
429+
let expected_input = [
430+
"SortExec: expr=[avg@5 ASC NULLS LAST], preserve_partitioning=[false]",
431+
" BoundedWindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
432+
" SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]",
433+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST]",
434+
];
435+
let expected_optimized = [
436+
"SortExec: expr=[avg@5 ASC NULLS LAST], preserve_partitioning=[false]",
437+
" BoundedWindowAggExec: wdw=[avg: Ok(Field { name: \"avg\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
438+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST]",
439+
];
440+
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
441+
442+
Ok(())
443+
}
444+
241445
#[tokio::test]
242446
async fn test_do_not_remove_sort_with_limit() -> Result<()> {
243447
let schema = create_test_schema()?;

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_common::{JoinType, Result};
3333
use datafusion_execution::object_store::ObjectStoreUrl;
3434
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3535
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
36+
use datafusion_functions_aggregate::average::avg_udaf;
3637
use datafusion_functions_aggregate::count::count_udaf;
3738
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
3839
use datafusion_physical_expr::expressions::col;
@@ -120,6 +121,17 @@ pub fn create_test_schema3() -> Result<SchemaRef> {
120121
Ok(schema)
121122
}
122123

124+
// Generate a schema which consists of 5 columns (a, b, c, d, e) of Uint64
125+
pub fn create_test_schema4() -> Result<SchemaRef> {
126+
let a = Field::new("a", DataType::UInt64, true);
127+
let b = Field::new("b", DataType::UInt64, false);
128+
let c = Field::new("c", DataType::UInt64, true);
129+
let d = Field::new("d", DataType::UInt64, false);
130+
let e = Field::new("e", DataType::Int64, false);
131+
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
132+
Ok(schema)
133+
}
134+
123135
pub fn sort_merge_join_exec(
124136
left: Arc<dyn ExecutionPlan>,
125137
right: Arc<dyn ExecutionPlan>,
@@ -188,15 +200,58 @@ pub fn bounded_window_exec(
188200
col_name: &str,
189201
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
190202
input: Arc<dyn ExecutionPlan>,
203+
) -> Arc<dyn ExecutionPlan> {
204+
bounded_window_exec_with_partition(col_name, sort_exprs, &[], input, false)
205+
}
206+
207+
pub fn bounded_window_exec_with_partition(
208+
col_name: &str,
209+
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
210+
partition_by: &[Arc<dyn PhysicalExpr>],
211+
input: Arc<dyn ExecutionPlan>,
212+
should_reverse: bool,
213+
) -> Arc<dyn ExecutionPlan> {
214+
let sort_exprs: LexOrdering = sort_exprs.into_iter().collect();
215+
let schema = input.schema();
216+
let mut window_expr = create_window_expr(
217+
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
218+
"count".to_owned(),
219+
&[col(col_name, &schema).unwrap()],
220+
partition_by,
221+
sort_exprs.as_ref(),
222+
Arc::new(WindowFrame::new(Some(false))),
223+
schema.as_ref(),
224+
false,
225+
)
226+
.unwrap();
227+
if should_reverse {
228+
window_expr = window_expr.get_reverse_expr().unwrap();
229+
}
230+
231+
Arc::new(
232+
BoundedWindowAggExec::try_new(
233+
vec![window_expr],
234+
Arc::clone(&input),
235+
vec![],
236+
InputOrderMode::Sorted,
237+
)
238+
.unwrap(),
239+
)
240+
}
241+
242+
pub fn bounded_window_exec_non_set_monotonic(
243+
col_name: &str,
244+
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
245+
input: Arc<dyn ExecutionPlan>,
191246
) -> Arc<dyn ExecutionPlan> {
192247
let sort_exprs: LexOrdering = sort_exprs.into_iter().collect();
193248
let schema = input.schema();
194249

195250
Arc::new(
196251
BoundedWindowAggExec::try_new(
197252
vec![create_window_expr(
198-
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
199-
"count".to_owned(),
253+
&WindowFunctionDefinition::AggregateUDF(avg_udaf()),
254+
"avg".to_owned(),
200255
&[col(col_name, &schema).unwrap()],
201256
&[],
202257
sort_exprs.as_ref(),

datafusion/expr/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ pub use partition_evaluator::PartitionEvaluator;
9191
pub use sqlparser;
9292
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
9393
pub use udaf::{
94-
aggregate_doc_sections, AggregateUDF, AggregateUDFImpl, ReversedUDAF, StatisticsArgs,
94+
aggregate_doc_sections, AggregateUDF, AggregateUDFImpl, ReversedUDAF,
95+
SetMonotonicity, StatisticsArgs,
9596
};
9697
pub use udf::{
9798
scalar_doc_sections, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF,

datafusion/expr/src/udaf.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
389389

390390
/// Whether the aggregate function is nullable.
391391
///
392-
/// Nullable means that that the function could return `null` for any inputs.
392+
/// Nullable means that the function could return `null` for any inputs.
393393
/// For example, aggregate functions like `COUNT` always return a non null value
394394
/// but others like `MIN` will return `NULL` if there is nullable input.
395395
/// Note that if the function is declared as *not* nullable, make sure the [`AggregateUDFImpl::default_value`] is `non-null`
@@ -635,6 +635,12 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
635635
fn documentation(&self) -> Option<&Documentation> {
636636
None
637637
}
638+
639+
/// Indicates whether the aggregation function is monotonic as a set
640+
/// function. See [`SetMonotonicity`] for details.
641+
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
642+
SetMonotonicity::NotMonotonic
643+
}
638644
}
639645

640646
impl PartialEq for dyn AggregateUDFImpl {
@@ -818,6 +824,27 @@ pub mod aggregate_doc_sections {
818824
};
819825
}
820826

827+
/// Indicates whether an aggregation function is monotonic as a set
828+
/// function. A set function is monotonically increasing if its value
829+
/// increases as its argument grows (as a set). Formally, `f` is a
830+
/// monotonically increasing set function if `f(S) >= f(T)` whenever `S`
831+
/// is a superset of `T`.
832+
///
833+
/// For example `COUNT` and `MAX` are monotonically increasing as their
834+
/// values always increase (or stay the same) as new values are seen. On
835+
/// the other hand, `MIN` is monotonically decreasing as its value always
836+
/// decreases or stays the same as new values are seen.
837+
#[derive(Debug, Clone, PartialEq)]
838+
pub enum SetMonotonicity {
839+
/// Aggregate value increases or stays the same as the input set grows.
840+
Increasing,
841+
/// Aggregate value decreases or stays the same as the input set grows.
842+
Decreasing,
843+
/// Aggregate value may increase, decrease, or stay the same as the input
844+
/// set grows.
845+
NotMonotonic,
846+
}
847+
821848
#[cfg(test)]
822849
mod test {
823850
use crate::{AggregateUDF, AggregateUDFImpl};

datafusion/expr/src/window_frame.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ impl WindowFrame {
291291
&& (self.end_bound.is_unbounded()
292292
|| self.end_bound == WindowFrameBound::CurrentRow)
293293
}
294+
295+
/// Is the window frame ever-expanding (it always grows in the superset sense).
296+
/// Useful when understanding if set-monotonicity properties of functions can
297+
/// be exploited.
298+
pub fn is_ever_expanding(&self) -> bool {
299+
self.start_bound.is_unbounded()
300+
}
294301
}
295302

296303
/// There are five ways to describe starting and ending frame boundaries:

0 commit comments

Comments
 (0)