Skip to content

Commit aec3420

Browse files
Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
* add starting code for experimenting * stream group by linear implementation * sorted implementation * minor changes * simplifications * Simplifications * convert vec to Option * minor changes * minor changes * minor changes * simplifications * minor changes * all tests pass * refactor * simplifications * remove unnecessary code * simplifications * minor changes * simplifications * minor changes * Simplify the GroupByOrderMode type * Address reviews * separate fully ordered case and remaining cases * change test data type * address reviews * Convert to option * retract back to old API. * Code quality: stylistic changes * Separate bounded stream and hash stream * Update comments --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent a384809 commit aec3420

23 files changed

+1818
-199
lines changed

datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

Lines changed: 1043 additions & 0 deletions
Large diffs are not rendered by default.

datafusion/core/src/physical_plan/aggregates/mod.rs

Lines changed: 257 additions & 34 deletions
Large diffs are not rendered by default.

datafusion/core/src/physical_plan/aggregates/row_hash.rs

Lines changed: 11 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,34 @@ use std::task::{Context, Poll};
2424
use std::vec;
2525

2626
use ahash::RandomState;
27-
use arrow::row::{OwnedRow, RowConverter, SortField};
27+
use arrow::row::{RowConverter, SortField};
2828
use datafusion_physical_expr::hash_utils::create_hashes;
2929
use futures::ready;
3030
use futures::stream::{Stream, StreamExt};
3131

3232
use crate::execution::context::TaskContext;
3333
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
3434
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
35+
use crate::physical_plan::aggregates::utils::{
36+
aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
37+
read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
38+
};
3539
use crate::physical_plan::aggregates::{
36-
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
37-
AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
40+
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
41+
PhysicalGroupBy, RowAccumulatorItem,
3842
};
3943
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
4044
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
4145
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
4246
use arrow::array::*;
43-
use arrow::compute::{cast, filter};
44-
use arrow::datatypes::{DataType, Schema, UInt32Type};
45-
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
47+
use arrow::compute::cast;
48+
use arrow::datatypes::DataType;
49+
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
4650
use datafusion_common::cast::as_boolean_array;
47-
use datafusion_common::utils::get_arrayref_at_indices;
4851
use datafusion_common::{Result, ScalarValue};
4952
use datafusion_expr::Accumulator;
5053
use datafusion_row::accessor::RowAccessor;
5154
use datafusion_row::layout::RowLayout;
52-
use datafusion_row::reader::{read_row, RowReader};
53-
use datafusion_row::MutableRecordBatch;
5455
use hashbrown::raw::RawTable;
5556
use itertools::izip;
5657

@@ -68,7 +69,6 @@ use itertools::izip;
6869
/// 4. The state's RecordBatch is `merge`d to a new state
6970
/// 5. The state is mapped to the final value
7071
///
71-
/// [Arrow-row]: OwnedRow
7272
/// [WordAligned]: datafusion_row::layout
7373
pub(crate) struct GroupedHashAggregateStream {
7474
schema: SchemaRef,
@@ -107,22 +107,6 @@ pub(crate) struct GroupedHashAggregateStream {
107107
indices: [Vec<Range<usize>>; 2],
108108
}
109109

110-
#[derive(Debug)]
111-
/// tracks what phase the aggregation is in
112-
enum ExecutionState {
113-
ReadingInput,
114-
ProducingOutput,
115-
Done,
116-
}
117-
118-
fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
119-
let fields = aggr_expr
120-
.iter()
121-
.flat_map(|expr| expr.state_fields().unwrap().into_iter())
122-
.collect::<Vec<_>>();
123-
Ok(Arc::new(Schema::new(fields)))
124-
}
125-
126110
impl GroupedHashAggregateStream {
127111
/// Create a new GroupedHashAggregateStream
128112
#[allow(clippy::too_many_arguments)]
@@ -617,25 +601,8 @@ impl GroupedHashAggregateStream {
617601
}
618602
}
619603

620-
/// The state that is built for each output group.
621-
#[derive(Debug)]
622-
pub struct GroupState {
623-
/// The actual group by values, stored sequentially
624-
group_by_values: OwnedRow,
625-
626-
// Accumulator state, stored sequentially
627-
pub aggregation_buffer: Vec<u8>,
628-
629-
// Accumulator state, one for each aggregate that doesn't support row accumulation
630-
pub accumulator_set: Vec<AccumulatorItem>,
631-
632-
/// scratch space used to collect indices for input rows in a
633-
/// bach that have values to aggregate. Reset on each batch
634-
pub indices: Vec<u32>,
635-
}
636-
637604
/// The state of all the groups
638-
pub struct AggregationState {
605+
pub(crate) struct AggregationState {
639606
pub reservation: MemoryReservation,
640607

641608
/// Logically maps group values to an index in `group_states`
@@ -788,88 +755,3 @@ impl GroupedHashAggregateStream {
788755
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
789756
}
790757
}
791-
792-
fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
793-
let row_num = rows.len();
794-
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
795-
let mut row = RowReader::new(schema);
796-
797-
for data in rows {
798-
row.point_to(0, data);
799-
read_row(&row, &mut output, schema);
800-
}
801-
802-
output.output_as_columns()
803-
}
804-
805-
fn get_at_indices(
806-
input_values: &[Vec<ArrayRef>],
807-
batch_indices: &PrimitiveArray<UInt32Type>,
808-
) -> Result<Vec<Vec<ArrayRef>>> {
809-
input_values
810-
.iter()
811-
.map(|array| get_arrayref_at_indices(array, batch_indices))
812-
.collect()
813-
}
814-
815-
fn get_optional_filters(
816-
original_values: &[Option<Arc<dyn Array>>],
817-
batch_indices: &PrimitiveArray<UInt32Type>,
818-
) -> Vec<Option<Arc<dyn Array>>> {
819-
original_values
820-
.iter()
821-
.map(|array| {
822-
array.as_ref().map(|array| {
823-
compute::take(
824-
array.as_ref(),
825-
batch_indices,
826-
None, // None: no index check
827-
)
828-
.unwrap()
829-
})
830-
})
831-
.collect()
832-
}
833-
834-
fn slice_and_maybe_filter(
835-
aggr_array: &[ArrayRef],
836-
filter_opt: Option<&Arc<dyn Array>>,
837-
offsets: &[usize],
838-
) -> Result<Vec<ArrayRef>> {
839-
let sliced_arrays: Vec<ArrayRef> = aggr_array
840-
.iter()
841-
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
842-
.collect();
843-
844-
let filtered_arrays = match filter_opt.as_ref() {
845-
Some(f) => {
846-
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
847-
let filter_array = as_boolean_array(&sliced)?;
848-
849-
sliced_arrays
850-
.iter()
851-
.map(|array| filter(array, filter_array).unwrap())
852-
.collect::<Vec<ArrayRef>>()
853-
}
854-
None => sliced_arrays,
855-
};
856-
Ok(filtered_arrays)
857-
}
858-
859-
/// This method is similar to Scalar::try_from_array except for the Null handling.
860-
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
861-
fn col_to_scalar(
862-
array: &ArrayRef,
863-
filter: &Option<&BooleanArray>,
864-
row_index: usize,
865-
) -> Result<ScalarValue> {
866-
if array.is_null(row_index) {
867-
return Ok(ScalarValue::Null);
868-
}
869-
if let Some(filter) = filter {
870-
if !filter.value(row_index) {
871-
return Ok(ScalarValue::Null);
872-
}
873-
}
874-
ScalarValue::try_from_array(array, row_index)
875-
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::physical_plan::aggregates::AccumulatorItem;
19+
use arrow::compute;
20+
use arrow::compute::filter;
21+
use arrow::row::OwnedRow;
22+
use arrow_array::types::UInt32Type;
23+
use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
24+
use arrow_schema::{Schema, SchemaRef};
25+
use datafusion_common::cast::as_boolean_array;
26+
use datafusion_common::utils::get_arrayref_at_indices;
27+
use datafusion_common::{Result, ScalarValue};
28+
use datafusion_physical_expr::AggregateExpr;
29+
use datafusion_row::reader::{read_row, RowReader};
30+
use datafusion_row::MutableRecordBatch;
31+
use std::sync::Arc;
32+
33+
/// The state that is built for each output group.
34+
#[derive(Debug)]
35+
pub(crate) struct GroupState {
36+
/// The actual group by values, stored sequentially
37+
pub group_by_values: OwnedRow,
38+
39+
// Accumulator state, stored sequentially
40+
pub aggregation_buffer: Vec<u8>,
41+
42+
// Accumulator state, one for each aggregate that doesn't support row accumulation
43+
pub accumulator_set: Vec<AccumulatorItem>,
44+
45+
/// scratch space used to collect indices for input rows in a
46+
/// bach that have values to aggregate. Reset on each batch
47+
pub indices: Vec<u32>,
48+
}
49+
50+
#[derive(Debug)]
51+
/// tracks what phase the aggregation is in
52+
pub(crate) enum ExecutionState {
53+
ReadingInput,
54+
ProducingOutput,
55+
Done,
56+
}
57+
58+
pub(crate) fn aggr_state_schema(
59+
aggr_expr: &[Arc<dyn AggregateExpr>],
60+
) -> Result<SchemaRef> {
61+
let fields = aggr_expr
62+
.iter()
63+
.flat_map(|expr| expr.state_fields().unwrap().into_iter())
64+
.collect::<Vec<_>>();
65+
Ok(Arc::new(Schema::new(fields)))
66+
}
67+
68+
pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
69+
let row_num = rows.len();
70+
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
71+
let mut row = RowReader::new(schema);
72+
73+
for data in rows {
74+
row.point_to(0, data);
75+
read_row(&row, &mut output, schema);
76+
}
77+
78+
output.output_as_columns()
79+
}
80+
81+
pub(crate) fn get_at_indices(
82+
input_values: &[Vec<ArrayRef>],
83+
batch_indices: &PrimitiveArray<UInt32Type>,
84+
) -> Result<Vec<Vec<ArrayRef>>> {
85+
input_values
86+
.iter()
87+
.map(|array| get_arrayref_at_indices(array, batch_indices))
88+
.collect()
89+
}
90+
91+
pub(crate) fn get_optional_filters(
92+
original_values: &[Option<Arc<dyn Array>>],
93+
batch_indices: &PrimitiveArray<UInt32Type>,
94+
) -> Vec<Option<Arc<dyn Array>>> {
95+
original_values
96+
.iter()
97+
.map(|array| {
98+
array.as_ref().map(|array| {
99+
compute::take(
100+
array.as_ref(),
101+
batch_indices,
102+
None, // None: no index check
103+
)
104+
.unwrap()
105+
})
106+
})
107+
.collect()
108+
}
109+
110+
pub(crate) fn slice_and_maybe_filter(
111+
aggr_array: &[ArrayRef],
112+
filter_opt: Option<&Arc<dyn Array>>,
113+
offsets: &[usize],
114+
) -> Result<Vec<ArrayRef>> {
115+
let sliced_arrays: Vec<ArrayRef> = aggr_array
116+
.iter()
117+
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
118+
.collect();
119+
120+
let filtered_arrays = match filter_opt.as_ref() {
121+
Some(f) => {
122+
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
123+
let filter_array = as_boolean_array(&sliced)?;
124+
125+
sliced_arrays
126+
.iter()
127+
.map(|array| filter(array, filter_array).unwrap())
128+
.collect::<Vec<ArrayRef>>()
129+
}
130+
None => sliced_arrays,
131+
};
132+
Ok(filtered_arrays)
133+
}
134+
135+
/// This method is similar to Scalar::try_from_array except for the Null handling.
136+
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
137+
pub(crate) fn col_to_scalar(
138+
array: &ArrayRef,
139+
filter: &Option<&BooleanArray>,
140+
row_index: usize,
141+
) -> Result<ScalarValue> {
142+
if array.is_null(row_index) {
143+
return Ok(ScalarValue::Null);
144+
}
145+
if let Some(filter) = filter {
146+
if !filter.value(row_index) {
147+
return Ok(ScalarValue::Null);
148+
}
149+
}
150+
ScalarValue::try_from_array(array, row_index)
151+
}

datafusion/core/src/physical_plan/analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec {
7777
}
7878

7979
/// Specifies whether this plan generates an infinite stream of records.
80-
/// If the plan does not support pipelining, but it its input(s) are
80+
/// If the plan does not support pipelining, but its input(s) are
8181
/// infinite, returns an error to indicate this.
8282
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
8383
if children[0] {

datafusion/core/src/physical_plan/coalesce_batches.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
9696
}
9797

9898
/// Specifies whether this plan generates an infinite stream of records.
99-
/// If the plan does not support pipelining, but it its input(s) are
99+
/// If the plan does not support pipelining, but its input(s) are
100100
/// infinite, returns an error to indicate this.
101101
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
102102
Ok(children[0])

datafusion/core/src/physical_plan/coalesce_partitions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
8181
}
8282

8383
/// Specifies whether this plan generates an infinite stream of records.
84-
/// If the plan does not support pipelining, but it its input(s) are
84+
/// If the plan does not support pipelining, but its input(s) are
8585
/// infinite, returns an error to indicate this.
8686
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
8787
Ok(children[0])

datafusion/core/src/physical_plan/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec {
107107
}
108108

109109
/// Specifies whether this plan generates an infinite stream of records.
110-
/// If the plan does not support pipelining, but it its input(s) are
110+
/// If the plan does not support pipelining, but its input(s) are
111111
/// infinite, returns an error to indicate this.
112112
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
113113
Ok(children[0])

datafusion/core/src/physical_plan/joins/cross_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec {
160160
}
161161

162162
/// Specifies whether this plan generates an infinite stream of records.
163-
/// If the plan does not support pipelining, but it its input(s) are
163+
/// If the plan does not support pipelining, but its input(s) are
164164
/// infinite, returns an error to indicate this.
165165
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
166166
if children[0] || children[1] {

datafusion/core/src/physical_plan/joins/hash_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec {
247247
}
248248

249249
/// Specifies whether this plan generates an infinite stream of records.
250-
/// If the plan does not support pipelining, but it its input(s) are
250+
/// If the plan does not support pipelining, but its input(s) are
251251
/// infinite, returns an error to indicate this.
252252
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
253253
let (left, right) = (children[0], children[1]);

0 commit comments

Comments
 (0)