Skip to content

[MINOR]: Add GroupByOrderMode to the AggregateExec display #6141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! Hash aggregation on ordered group by expressions
//! Output generated will itself have an ordering
//! and executor can run with bounded memory (can generate result in streaming cases)
//! This file implements streaming aggregation on ordered GROUP BY expressions.
//! Generated output will itself have an ordering and the executor can run with
//! bounded memory, ensuring composability in streaming cases.

use std::cmp::min;
use std::ops::Range;
Expand Down Expand Up @@ -185,7 +185,7 @@ impl BoundedAggregateStream {

let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?;

let row_aggr_schema = aggr_state_schema(&row_aggr_expr)?;
let row_aggr_schema = aggr_state_schema(&row_aggr_expr);

let group_schema = group_schema(&schema, group_by.expr.len());
let row_converter = RowConverter::new(
Expand Down Expand Up @@ -703,7 +703,7 @@ impl BoundedAggregateStream {
let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
let groups_with_rows = if let AggregationOrdering {
mode: GroupByOrderMode::Ordered,
mode: GroupByOrderMode::FullyOrdered,
order_indices,
ordering,
} = &self.aggregation_ordering
Expand All @@ -725,14 +725,10 @@ impl BoundedAggregateStream {
let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
let per_group_indices = ranges
.into_iter()
.map(|range| {
let row = group_rows.row(range.start).owned();
// (row, batch_hashes[range.start], range)
GroupOrderInfo {
owned_row: row,
hash: batch_hashes[range.start],
range,
}
.map(|range| GroupOrderInfo {
owned_row: group_rows.row(range.start).owned(),
hash: batch_hashes[range.start],
range,
})
.collect::<Vec<_>>();
self.update_ordered_group_state(
Expand Down Expand Up @@ -777,7 +773,7 @@ impl BoundedAggregateStream {
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
if self.aggregation_ordering.mode == GroupByOrderMode::Ordered {
if self.aggregation_ordering.mode == GroupByOrderMode::FullyOrdered {
self.update_accumulators_using_batch(
&groups_with_rows,
&offsets,
Expand Down
26 changes: 18 additions & 8 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub enum GroupByOrderMode {
// For example, if the input is ordered by a, b, c, d and we group by b, a;
// the mode will be `Ordered` meaning a all of the of group b, d
// defines a preset for the existing ordering, e.g a, b defines a preset.
Ordered,
FullyOrdered,
}

/// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET)
Expand Down Expand Up @@ -205,12 +205,15 @@ impl From<StreamType> for SendableRecordBatchStream {
}
}

/// This object encapsulates ordering-related information on GROUP BY columns.
#[derive(Debug, Clone)]
pub(crate) struct AggregationOrdering {
/// Specifies whether the GROUP BY columns are partially or fully ordered.
mode: GroupByOrderMode,
/// Stores indices such that when we iterate with these indices, GROUP BY
/// expressions match input ordering.
order_indices: Vec<usize>,
/// Actual ordering information of the GROUP BY columns.
ordering: Vec<PhysicalSortExpr>,
}

Expand Down Expand Up @@ -238,7 +241,7 @@ pub struct AggregateExec {
columns_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
/// Stores mode, and output ordering information for the `AggregateExec`.
/// Stores mode and output ordering information for the `AggregateExec`.
aggregation_ordering: Option<AggregationOrdering>,
}

Expand Down Expand Up @@ -291,12 +294,13 @@ fn get_working_mode(
input.equivalence_properties()
});
Some(if first_n == group_by.expr.len() {
(GroupByOrderMode::Ordered, ordered_group_by_indices)
(GroupByOrderMode::FullyOrdered, ordered_group_by_indices)
} else {
(GroupByOrderMode::PartiallyOrdered, ordered_group_by_indices)
})
}

/// This function gathers the ordering information for the GROUP BY columns.
fn calc_aggregation_ordering(
input: &Arc<dyn ExecutionPlan>,
group_by: &PhysicalGroupBy,
Expand All @@ -321,7 +325,7 @@ fn calc_aggregation_ordering(
})
}

/// Grouping expressions as they occur in the output schema
/// This function returns grouping expressions as they occur in the output schema.
fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalExpr>> {
// Update column indices. Since the group by columns come first in the output schema, their
// indices are simply 0..self.group_expr(len).
Expand Down Expand Up @@ -644,6 +648,10 @@ impl ExecutionPlan for AggregateExec {
.map(|agg| agg.name().to_string())
.collect();
write!(f, ", aggr=[{}]", a.join(", "))?;

if let Some(aggregation_ordering) = &self.aggregation_ordering {
write!(f, ", ordering_mode={:?}", aggregation_ordering.mode)?;
}
}
}
Ok(())
Expand Down Expand Up @@ -959,7 +967,9 @@ mod tests {
use std::task::{Context, Poll};

use super::StreamType;
use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Expand Down Expand Up @@ -1017,13 +1027,13 @@ mod tests {
// Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
// GroupByOrderMode.
let test_cases = vec![
(vec!["a"], Some((Ordered, vec![0]))),
(vec!["a"], Some((FullyOrdered, vec![0]))),
(vec!["b"], None),
(vec!["c"], None),
(vec!["b", "a"], Some((Ordered, vec![1, 0]))),
(vec!["b", "a"], Some((FullyOrdered, vec![1, 0]))),
(vec!["c", "b"], None),
(vec!["c", "a"], Some((PartiallyOrdered, vec![1]))),
(vec!["c", "b", "a"], Some((Ordered, vec![2, 1, 0]))),
(vec!["c", "b", "a"], Some((FullyOrdered, vec![2, 1, 0]))),
(vec!["d", "a"], Some((PartiallyOrdered, vec![1]))),
(vec!["d", "b"], None),
(vec!["d", "c"], None),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl GroupedHashAggregateStream {

let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?;

let row_aggr_schema = aggr_state_schema(&row_aggr_expr)?;
let row_aggr_schema = aggr_state_schema(&row_aggr_expr);

let group_schema = group_schema(&schema, group_by.expr.len());
let row_converter = RowConverter::new(
Expand Down
49 changes: 24 additions & 25 deletions datafusion/core/src/physical_plan/aggregates/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! This file contains various utility functions that are common to both
//! batch and streaming aggregation code.

use crate::physical_plan::aggregates::AccumulatorItem;
use arrow::compute;
use arrow::compute::filter;
Expand All @@ -24,13 +27,13 @@ use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
use arrow_schema::{Schema, SchemaRef};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::AggregateExpr;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::MutableRecordBatch;
use std::sync::Arc;

/// The state that is built for each output group.
/// This object encapsulates the state that is built for each output group.
#[derive(Debug)]
pub(crate) struct GroupState {
/// The actual group by values, stored sequentially
Expand All @@ -42,32 +45,29 @@ pub(crate) struct GroupState {
// Accumulator state, one for each aggregate that doesn't support row accumulation
pub accumulator_set: Vec<AccumulatorItem>,

/// scratch space used to collect indices for input rows in a
/// bach that have values to aggregate. Reset on each batch
/// Scratch space used to collect indices for input rows in a
/// batch that have values to aggregate, reset on each batch.
pub indices: Vec<u32>,
}

#[derive(Debug)]
/// tracks what phase the aggregation is in
/// This object tracks the aggregation phase.
pub(crate) enum ExecutionState {
ReadingInput,
ProducingOutput,
Done,
}

pub(crate) fn aggr_state_schema(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<SchemaRef> {
pub(crate) fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> SchemaRef {
let fields = aggr_expr
.iter()
.flat_map(|expr| expr.state_fields().unwrap().into_iter())
.collect::<Vec<_>>();
Ok(Arc::new(Schema::new(fields)))
Arc::new(Schema::new(fields))
}

pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
let mut output = MutableRecordBatch::new(rows.len(), Arc::new(schema.clone()));
let mut row = RowReader::new(schema);

for data in rows {
Expand Down Expand Up @@ -112,28 +112,27 @@ pub(crate) fn slice_and_maybe_filter(
filter_opt: Option<&Arc<dyn Array>>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
.map(|array| array.slice(offset, length))
.collect();

let filtered_arrays = match filter_opt.as_ref() {
Some(f) => {
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
let filter_array = as_boolean_array(&sliced)?;
if let Some(f) = filter_opt {
let sliced = f.slice(offset, length);
let filter_array = as_boolean_array(&sliced)?;

sliced_arrays
.iter()
.map(|array| filter(array, filter_array).unwrap())
.collect::<Vec<ArrayRef>>()
}
None => sliced_arrays,
};
Ok(filtered_arrays)
sliced_arrays
.iter()
.map(|array| filter(array, filter_array).map_err(DataFusionError::ArrowError))
.collect()
} else {
Ok(sliced_arrays)
}
}

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)].
pub(crate) fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn test_source_sorted_groupby() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]",
" AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)]",
" AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=FullyOrdered",
]
};

Expand Down Expand Up @@ -225,7 +225,7 @@ async fn test_source_sorted_groupby2() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]",
" AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)]",
" AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=PartiallyOrdered",
]
};

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sqllogictests/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Sort: d.b ASC NULLS LAST
physical_plan
SortPreservingMergeExec: [b@0 ASC NULLS LAST]
ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)]
AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)], ordering_mode=FullyOrdered
ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
Expand Down