Skip to content

Implement intermediate result blocked approach to aggregation memory management #15591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 63 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4353748
define the needed methods in `GroupAccumulator` and `GroupValues`.
Rachelint Apr 9, 2025
a4450f7
define `GroupIndexOperations`, and impl block/flat mode for it.
Rachelint Apr 9, 2025
96b7435
support block approach for `GroupValuesPrimitive`.
Rachelint Apr 9, 2025
38d4fc6
add new emit mode.
Rachelint Apr 10, 2025
72e0fc3
make `NullState` codes common for better reuse.
Rachelint Apr 10, 2025
2dba944
improve comments.
Rachelint Apr 13, 2025
c08f23a
remove stale codes.
Rachelint Apr 13, 2025
f6f3bd6
add tests.
Rachelint Apr 13, 2025
53e8c8e
support dynamic dispatching for `NullState`.
Rachelint Apr 16, 2025
b5d231e
improve tests to cover `BlockedNullState`.
Rachelint Apr 16, 2025
a644b17
complete the impl of blocked `GroupValuesPrimitive`.
Rachelint Apr 17, 2025
67f03fc
support blocked mode for `GroupValuesPrimitive`.
Rachelint Apr 20, 2025
d165fb0
make `EmitTo::NextBlock` simpler.
Rachelint Apr 20, 2025
319e135
extract the common codes of block for reusing.
Rachelint Apr 20, 2025
808f142
support blocked mode for `PrimitiveGroupsAccumulator`.
Rachelint Apr 20, 2025
489f093
impl block based result returning logic.
Rachelint Apr 21, 2025
54002a1
add judgement about when we should enable blocked groups optimization.
Rachelint Apr 21, 2025
1f7b4bc
add config to control if we enable blocked groups optimization.
Rachelint Apr 21, 2025
e58afa5
fix e2e sql tests.
Rachelint Apr 21, 2025
4294ab7
fix group values len method.
Rachelint Apr 21, 2025
79714a4
add `memory_limit` to expose the info in `MemoryPool`.
Rachelint Apr 22, 2025
266b48e
modify the spilling judgement.
Rachelint Apr 22, 2025
55de98c
add unit tests for primitive group values.
Rachelint Apr 22, 2025
9145833
improve comments for `GroupIndexOperations`.
Rachelint Apr 23, 2025
04f15b0
add `enable_aggregation_blocked_groups` to aggr fuzzy test.
Rachelint Apr 24, 2025
be64a74
refactor and make `QueryBuilder` more configurable.
Rachelint Apr 25, 2025
7da0259
fix tests.
Rachelint Apr 25, 2025
d771038
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint Apr 27, 2025
ffb11cd
fix clippy.
Rachelint Apr 27, 2025
7f543d8
add fuzzy tests for blocked groups, and fix enable logic.
Rachelint Apr 27, 2025
868210f
update config.md and fix clippy.
Rachelint Apr 27, 2025
bdcd1b8
improve comment about blocked groups for `GroupedHashAggregateStream`.
Rachelint Apr 27, 2025
3c7317d
fix stack overflow.
Rachelint Apr 27, 2025
ff9c3ad
add extended query to see the improvement.
Rachelint Apr 27, 2025
96b3c77
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint May 3, 2025
3e23408
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint May 3, 2025
bb63628
Update datafusion/common/src/config.rs
Rachelint May 3, 2025
a7c4c7b
update config.
Rachelint May 3, 2025
e033567
fix fmt.
Rachelint May 3, 2025
d173056
fix logic test.
Rachelint May 3, 2025
29222e1
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint May 3, 2025
426e2ee
improve comments.
Rachelint May 3, 2025
948c4ce
move group index operations to a new module.
Rachelint May 3, 2025
09b97ab
define `Blocks` and use it to refactor.
Rachelint May 4, 2025
cee016c
extract blocks to a dedicated module.
Rachelint May 4, 2025
75ee3f3
add tests for `Blocks`.
Rachelint May 4, 2025
5a6e030
simplify codes about blocks.
Rachelint May 5, 2025
4c6799f
fix ci.
Rachelint May 5, 2025
93e5f9d
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint May 5, 2025
7542b49
return error when found blocked approach not supported.
Rachelint May 5, 2025
e8808eb
try to avoid using of `VecDeque`.
Rachelint May 6, 2025
e3ba95c
suggest inline.
Rachelint May 6, 2025
8807026
use unsafe to get in Vec.
Rachelint May 6, 2025
62157a9
optimize rehash.
Rachelint May 6, 2025
add409e
new blocks resize method.
Rachelint May 6, 2025
4e6193a
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint May 6, 2025
c7ce363
optimize boolean builder blocks.
Rachelint May 6, 2025
29e4d06
modify the block expand logic.
Rachelint May 7, 2025
311165a
check block distri.
Rachelint May 7, 2025
26381ea
new debug log.
Rachelint May 7, 2025
971d6d3
new poc.
Rachelint May 7, 2025
6ba1c00
new poc2.
Rachelint May 7, 2025
c42454e
poc3.
Rachelint May 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use arrow::datatypes::{Field, Schema};
use datafusion::physical_expr::NullState;
use datafusion::physical_expr::FlatNullState;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::{any::Any, sync::Arc};

Expand Down Expand Up @@ -217,7 +217,7 @@ struct GeometricMeanGroupsAccumulator {
prods: Vec<f64>,

/// Track nulls in the input / filters
null_state: NullState,
null_state: FlatNullState,
}

impl GeometricMeanGroupsAccumulator {
Expand All @@ -227,7 +227,7 @@ impl GeometricMeanGroupsAccumulator {
return_data_type: DataType::Float64,
counts: vec![],
prods: vec![],
null_state: NullState::new(),
null_state: FlatNullState::new(),
}
}
}
Expand All @@ -248,13 +248,17 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
// increment counts, update sums
self.counts.resize(total_num_groups, 0);
self.prods.resize(total_num_groups, 1.0);
// Use the `NullState` structure to generate specialized code for null / non null input elements
// Use the `NullState` structure to generate specialized code for null / non null input elements.
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
// still not support blocked groups.
// More details can see `GroupsAccumulator::supports_blocked_groups`.
self.null_state.accumulate(
group_indices,
values,
opt_filter,
total_num_groups,
|group_index, new_value| {
|_, group_index, new_value| {
let group_index = group_index as usize;
let prod = &mut self.prods[group_index];
*prod = prod.mul_wrapping(new_value);

Expand All @@ -279,13 +283,16 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
let partial_counts = values[1].as_primitive::<UInt32Type>();
// update counts with partial counts
self.counts.resize(total_num_groups, 0);
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
// still not support blocked groups.
// More details can see `GroupsAccumulator::supports_blocked_groups`.
self.null_state.accumulate(
group_indices,
partial_counts,
opt_filter,
total_num_groups,
|group_index, partial_count| {
self.counts[group_index] += partial_count;
|_, group_index, partial_count| {
self.counts[group_index as usize] += partial_count;
},
);

Expand All @@ -296,8 +303,8 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
partial_prods,
opt_filter,
total_num_groups,
|group_index, new_value: <Float64Type as ArrowPrimitiveType>::Native| {
let prod = &mut self.prods[group_index];
|_, group_index, new_value: <Float64Type as ArrowPrimitiveType>::Native| {
let prod = &mut self.prods[group_index as usize];
*prod = prod.mul_wrapping(new_value);
},
);
Expand Down
11 changes: 11 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ config_namespace! {
/// written, it may be necessary to increase this size to avoid errors from
/// the remote end point.
pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024

/// Should DataFusion use a blocked approach to manage grouping state.
/// By default, the blocked approach is used which
/// allocates capacity based on a predefined block size firstly.
/// When the block reaches its limit, we allocate a new block (also with
/// the same predefined block size based capacity) instead of expanding
/// the current one and copying the data.
/// If `false`, a single allocation approach is used, where
/// values are managed within a single large memory block.
/// As this block grows, it often triggers numerous copies, resulting in poor performance.
pub enable_aggregation_blocked_groups: bool, default = true
}
}

Expand Down
40 changes: 40 additions & 0 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,46 @@ async fn test_median() {
.await;
}

// Testing `blocked groups optimization`
// Details of this optimization can see:
// https://github.com/apache/datafusion/issues/7065
#[tokio::test(flavor = "multi_thread")]
async fn test_blocked_groups_optimization() {
let data_gen_config = baseline_config();

// Blocked groups supporting lists:
//
// `GroupAccumulator`:
// - PrimitiveGroupsAccumulator
//
// `GroupValues`:
// - GroupValuesPrimitive
//

// Test `Numeric aggregation` + `Single group by`
let aggr_functions = ["sum", "min", "max"];
let aggr_arguments = data_gen_config.numeric_columns();
let groups_by_columns = data_gen_config.numeric_columns();

let mut query_builder = QueryBuilder::new()
.with_table_name("fuzz_table")
.with_aggregate_arguments(aggr_arguments)
.set_group_by_columns(groups_by_columns)
.with_min_group_by_columns(1)
.with_max_group_by_columns(1)
.with_no_grouping(false);

for func in aggr_functions {
query_builder = query_builder.with_aggregate_function(func);
}

AggregationFuzzerBuilder::from(data_gen_config)
.add_query_builder(query_builder)
.build()
.run()
.await;
}

/// Return a standard set of columns for testing data generation
///
/// Includes numeric and string types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl SessionContextGenerator {
target_partitions,
skip_partial_params,
sort_hint: false,
enable_aggregation_blocked_groups: false,
table_name: self.table_name.clone(),
table_provider: Arc::new(provider),
};
Expand Down Expand Up @@ -146,11 +147,14 @@ impl SessionContextGenerator {
(provider, false)
};

let enable_aggregation_blocked_groups = rng.gen_bool(0.5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any value to testing the old code path (enable_aggregation_blocked_groups = false) if our goal is to remove it eventually.

I recommend only testing with the flag set to the default value


let builder = GeneratedSessionContextBuilder {
batch_size,
target_partitions,
sort_hint,
skip_partial_params,
enable_aggregation_blocked_groups,
table_name: self.table_name.clone(),
table_provider: Arc::new(provider),
};
Expand All @@ -174,6 +178,7 @@ struct GeneratedSessionContextBuilder {
target_partitions: usize,
sort_hint: bool,
skip_partial_params: SkipPartialParams,
enable_aggregation_blocked_groups: bool,
table_name: String,
table_provider: Arc<dyn TableProvider>,
}
Expand All @@ -198,6 +203,10 @@ impl GeneratedSessionContextBuilder {
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
&ScalarValue::Float64(Some(self.skip_partial_params.ratio_threshold)),
);
session_config = session_config.set(
"datafusion.execution.enable_aggregation_blocked_groups",
&ScalarValue::Boolean(Some(self.enable_aggregation_blocked_groups)),
);

let ctx = SessionContext::new_with_config(session_config);
ctx.register_table(self.table_name, self.table_provider)?;
Expand All @@ -207,6 +216,7 @@ impl GeneratedSessionContextBuilder {
target_partitions: self.target_partitions,
sort_hint: self.sort_hint,
skip_partial_params: self.skip_partial_params,
enable_aggregation_blocked_groups: self.enable_aggregation_blocked_groups,
};

Ok(SessionContextWithParams { ctx, params })
Expand All @@ -221,6 +231,7 @@ pub struct SessionContextParams {
target_partitions: usize,
sort_hint: bool,
skip_partial_params: SkipPartialParams,
enable_aggregation_blocked_groups: bool,
}

/// Partial skipping parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ impl QueryBuilder {

fn generate_query(&self) -> String {
let group_by = self.random_group_by();
dbg!(&group_by);
let mut query = String::from("SELECT ");
query.push_str(&group_by.join(", "));
if !group_by.is_empty() {
Expand Down
57 changes: 55 additions & 2 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
//! Vectorized [`GroupsAccumulator`]

use arrow::array::{ArrayRef, BooleanArray};
use datafusion_common::{not_impl_err, Result};
use datafusion_common::{not_impl_err, DataFusionError, Result};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
pub enum EmitTo {
/// Emit all groups
/// Emit all groups, will clear all existing group indexes
All,
/// Emit only the first `n` groups and shift all existing group
/// indexes down by `n`.
///
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
/// Emit next block in the blocked managed groups
///
/// Similar as `Emit::All`, will also clear all existing group indexes
NextBlock,
}

impl EmitTo {
Expand All @@ -39,6 +43,9 @@ impl EmitTo {
/// remaining values in `v`.
///
/// This avoids copying if Self::All
///
/// NOTICE: only support emit strategies: `Self::All` and `Self::First`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// NOTICE: only support emit strategies: `Self::All` and `Self::First`
/// NOTICE: only support emit strategies: `Self::All` and `Self::First`
/// Will call `panic` if called with `Self::NextBlock`

///
pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
match self {
Self::All => {
Expand All @@ -52,6 +59,7 @@ impl EmitTo {
std::mem::swap(v, &mut t);
t
}
Self::NextBlock => unreachable!("don't support take block in take_needed"),
}
}
}
Expand Down Expand Up @@ -250,4 +258,49 @@ pub trait GroupsAccumulator: Send {
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;

/// Returns `true` if this accumulator supports blocked groups.
///
/// Blocked groups(or called blocked management approach) is an optimization
/// to reduce the cost of managing aggregation intermediate states.
///
/// Here is brief introduction for two states management approaches:
/// - Blocked approach, states are stored and managed in multiple `Vec`s,
/// we call it `Block`s. Organize like this is for avoiding to resize `Vec`
/// and allocate a new `Vec` instead to reduce cost and get better performance.
/// When locating data in `Block`s, we need to use `block_id` to locate the
/// needed `Block` at first, and use `block_offset` to locate the needed
/// data in `Block` after.
///
/// - Single approach, all states are stored and managed in a single large `Block`.
/// So when locating data, `block_id` will always be 0, and we only need `block_offset`
/// to locate data in the single `Block`.
///
/// More details can see:
/// <https://github.com/apache/datafusion/issues/7065>
///
fn supports_blocked_groups(&self) -> bool {
false
}

/// Alter the block size in the accumulator
///
/// If the target block size is `None`, it will use a single big
/// block(can think it a `Vec`) to manage the state.
///
/// If the target block size` is `Some(blk_size)`, it will try to
/// set the block size to `blk_size`, and the try will only success
/// when the accumulator has supported blocked mode.
///
/// NOTICE: After altering block size, all data in previous will be cleared.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that all existing accumulators will be cleared?

Copy link
Contributor Author

@Rachelint Rachelint May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be usually used in load back + merge step in spilling:

  • Emit the rest blocks at first
  • Clear all stale data, and switch to flat mode and perform sorted aggregation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// NOTICE: After altering block size, all data in previous will be cleared.
/// NOTICE: After altering block size, all data in existing accumulators will be cleared.

///
fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
if block_size.is_some() {
return Err(DataFusionError::NotImplemented(
"this accumulator doesn't support blocked mode yet".to_string(),
));
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]

pub mod accumulate;
pub mod blocks;
pub mod bool_op;
pub mod group_index_operations;
pub mod nulls;
pub mod prim_op;

Expand Down
Loading
Loading