Skip to content

Commit c51d409

Browse files
committed
support blocked mode for PrimitiveGroupsAccumulator.
1 parent 34f5fa2 commit c51d409

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator
2121
2222
use std::collections::VecDeque;
23-
use std::fmt::{self, Debug};
23+
use std::fmt::Debug;
2424
use std::marker::PhantomData;
2525

2626
use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
@@ -401,6 +401,7 @@ impl SeenValues for BlockedSeenValues {
401401

402402
/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and [`BlockedNullState`].
403403
/// For performance, the cost of batch-level dynamic dispatching is acceptable.
404+
#[derive(Debug)]
404405
pub enum NullStateAdapter {
405406
Flat(FlatNullState),
406407
Blocked(BlockedNullState),
@@ -479,6 +480,13 @@ impl NullStateAdapter {
479480
}
480481
}
481482

483+
pub fn size(&self) -> usize {
484+
match self {
485+
NullStateAdapter::Flat(null_state) => null_state.size(),
486+
NullStateAdapter::Blocked(null_state) => null_state.size(),
487+
}
488+
}
489+
482490
/// Clone and build a single [`BooleanBuffer`] from `seen_values`,
483491
/// only used for testing.
484492
#[cfg(test)]

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs

+58-10
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::VecDeque;
19+
use std::iter;
1820
use std::mem::size_of;
1921
use std::sync::Arc;
2022

21-
use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray};
23+
use arrow::array::{ArrayRef, ArrowNativeTypeOp, AsArray, BooleanArray, PrimitiveArray};
2224
use arrow::buffer::NullBuffer;
2325
use arrow::compute;
2426
use arrow::datatypes::ArrowPrimitiveType;
2527
use arrow::datatypes::DataType;
2628
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
2729
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
2830

29-
use super::accumulate::FlatNullState;
31+
use crate::aggregate::groups_accumulator::accumulate::NullStateAdapter;
32+
use crate::aggregate::groups_accumulator::{ensure_room_enough_for_blocks, Block};
3033

3134
/// An accumulator that implements a single operation over
3235
/// [`ArrowPrimitiveType`] where the accumulated state is the same as
@@ -44,7 +47,7 @@ where
4447
F: Fn(&mut T::Native, T::Native) + Send + Sync,
4548
{
4649
/// values per group, stored as the native type
47-
values: Vec<T::Native>,
50+
values: VecDeque<Vec<T::Native>>,
4851

4952
/// The output type (needed for Decimal precision and scale)
5053
data_type: DataType,
@@ -53,10 +56,12 @@ where
5356
starting_value: T::Native,
5457

5558
/// Track nulls in the input / filters
56-
null_state: FlatNullState,
59+
null_state: NullStateAdapter,
5760

5861
/// Function that computes the primitive result
5962
prim_fn: F,
63+
64+
block_size: Option<usize>,
6065
}
6166

6267
impl<T, F> PrimitiveGroupsAccumulator<T, F>
@@ -66,11 +71,12 @@ where
6671
{
6772
pub fn new(data_type: &DataType, prim_fn: F) -> Self {
6873
Self {
69-
values: vec![],
74+
values: VecDeque::new(),
7075
data_type: data_type.clone(),
71-
null_state: FlatNullState::new(),
76+
null_state: NullStateAdapter::new(None),
7277
starting_value: T::default_value(),
7378
prim_fn,
79+
block_size: None,
7480
}
7581
}
7682

@@ -97,16 +103,34 @@ where
97103
let values = values[0].as_primitive::<T>();
98104

99105
// update values
100-
self.values.resize(total_num_groups, self.starting_value);
106+
if let Some(blk_size) = self.block_size {
107+
let new_block = |block_size: usize| Vec::with_capacity(block_size);
108+
ensure_room_enough_for_blocks(
109+
&mut self.values,
110+
total_num_groups,
111+
blk_size,
112+
new_block,
113+
self.starting_value,
114+
);
115+
} else {
116+
if self.values.is_empty() {
117+
self.values.push_back(Vec::new());
118+
}
119+
120+
self.values
121+
.back_mut()
122+
.unwrap()
123+
.resize(total_num_groups, self.starting_value);
124+
}
101125

102126
// NullState dispatches / handles tracking nulls and groups that saw no values
103127
self.null_state.accumulate(
104128
group_indices,
105129
values,
106130
opt_filter,
107131
total_num_groups,
108-
|_, group_index, new_value| {
109-
let value = &mut self.values[group_index as usize];
132+
|block_id, block_offset, new_value| {
133+
let value = &mut self.values[block_id as usize][block_offset as usize];
110134
(self.prim_fn)(value, new_value);
111135
},
112136
);
@@ -115,7 +139,7 @@ where
115139
}
116140

117141
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
118-
let values = emit_to.take_needed_rows(&mut self.values);
142+
let values = emit_to.take_needed(&mut self.values, self.block_size.is_some());
119143
let nulls = self.null_state.build(emit_to);
120144
let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)) // no copy
121145
.with_data_type(self.data_type.clone());
@@ -198,4 +222,28 @@ where
198222
fn size(&self) -> usize {
199223
self.values.capacity() * size_of::<T::Native>() + self.null_state.size()
200224
}
225+
226+
fn supports_blocked_groups(&self) -> bool {
227+
true
228+
}
229+
230+
fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
231+
self.values.clear();
232+
self.null_state = NullStateAdapter::new(block_size);
233+
self.block_size = block_size;
234+
235+
Ok(())
236+
}
237+
}
238+
239+
impl<N: ArrowNativeTypeOp> Block for Vec<N> {
240+
type T = N;
241+
242+
fn len(&self) -> usize {
243+
self.len()
244+
}
245+
246+
fn fill_default_value(&mut self, fill_len: usize, default_value: Self::T) {
247+
self.extend(iter::repeat(default_value.clone()).take(fill_len));
248+
}
201249
}

0 commit comments

Comments
 (0)