Skip to content

Commit 426e2ee

Browse files
committed
improve comments.
1 parent 29222e1 commit 426e2ee

File tree

8 files changed

+88
-8
lines changed

8 files changed

+88
-8
lines changed

datafusion-examples/examples/advanced_udaf.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,10 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
248248
// increment counts, update sums
249249
self.counts.resize(total_num_groups, 0);
250250
self.prods.resize(total_num_groups, 1.0);
251-
// Use the `NullState` structure to generate specialized code for null / non null input elements
251+
// Use the `NullState` structure to generate specialized code for null / non null input elements.
252+
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
253+
// still not support blocked groups.
254+
// More details can see `GroupsAccumulator::supports_blocked_groups`.
252255
self.null_state.accumulate(
253256
group_indices,
254257
values,
@@ -280,6 +283,9 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
280283
let partial_counts = values[1].as_primitive::<UInt32Type>();
281284
// update counts with partial counts
282285
self.counts.resize(total_num_groups, 0);
286+
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
287+
// still not support blocked groups.
288+
// More details can see `GroupsAccumulator::supports_blocked_groups`.
283289
self.null_state.accumulate(
284290
group_indices,
285291
partial_counts,

datafusion/expr-common/src/groups_accumulator.rs

+27
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ pub enum EmitTo {
4141

4242
impl EmitTo {
4343
/// Remove and return `needed values` from `values`.
44+
///
45+
/// Inputs:
46+
/// - `values`, the emitting source.
47+
/// - `is_blocked_groups`, is the `values` organized in `single`
48+
/// or `blocked` approach, more details can see
49+
/// [`GroupsAccumulator::supports_blocked_groups`].
50+
///
51+
///
4452
pub fn take_needed<T>(
4553
&self,
4654
values: &mut VecDeque<Vec<T>>,
@@ -290,6 +298,25 @@ pub trait GroupsAccumulator: Send {
290298
fn size(&self) -> usize;
291299

292300
/// Returns `true` if this accumulator supports blocked groups.
301+
///
302+
/// Blocked groups(or called blocked management approach) is an optimization
303+
/// to reduce the cost of managing aggregation intermediate states.
304+
///
305+
/// Here is brief introduction for two states management approaches:
306+
/// - Blocked approach, states are stored and managed in multiple `Vec`s,
307+
/// we call it `Block`s. Organize like this is for avoiding to resize `Vec`
308+
/// and allocate a new `Vec` instead to reduce cost and get better performance.
309+
/// When locating data in `Block`s, we need to use `block_id` to locate the
310+
/// needed `Block` at first, and use `block_offset` to locate the needed
311+
/// data in `Block` after.
312+
///
313+
/// - Single approach, all states are stored and managed in a single large `Block`.
314+
/// So when locating data, `block_id` will always be 0, and we only need `block_offset`
315+
/// to locate data in the single `Block`.
316+
///
317+
/// More details can see:
318+
/// https://github.com/apache/datafusion/issues/7065
319+
///
293320
fn supports_blocked_groups(&self) -> bool {
294321
false
295322
}

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,10 @@ pub trait SeenValues: Default + Debug + Send {
245245
}
246246

247247
/// [`SeenValues`] for `flat groups input`
248-
///
248+
///
249+
/// At first, you may need to see something about `block_id` and `block_offset`
250+
/// from [`GroupsAccumulator::supports_blocked_groups`].
251+
///
249252
/// The `flat groups input` are organized like:
250253
///
251254
/// ```text
@@ -261,7 +264,9 @@ pub trait SeenValues: Default + Debug + Send {
261264
///
262265
/// For `set_bit(block_id, block_offset, value)`, `block_id` is unused,
263266
/// `block_offset` will be set to `group_index`.
264-
///
267+
///
268+
/// [`GroupsAccumulator::supports_blocked_groups`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator::supports_blocked_groups
269+
///
265270
#[derive(Debug)]
266271
pub struct FlatSeenValues {
267272
builder: BooleanBufferBuilder,
@@ -318,6 +323,9 @@ impl SeenValues for FlatSeenValues {
318323

319324
/// [`SeenValues`] for `blocked groups input`
320325
///
326+
/// At first, you may need to see something about `block_id` and `block_offset`
327+
/// from [`GroupsAccumulator::supports_blocked_groups`].
328+
///
321329
/// The `flat groups input` are organized like:
322330
///
323331
/// ```text
@@ -328,10 +336,12 @@ impl SeenValues for FlatSeenValues {
328336
/// row_n (block_id_n, block_offset_n)
329337
/// ```
330338
///
331-
/// If ` row_x (block_id_x, block_offset_x)` is not filtered
339+
/// If `row_x (block_id_x, block_offset_x)` is not filtered
332340
/// (`block_id_x, block_offset_x` is seen), `seen_values[block_id_x][block_offset_x]`
333341
/// will be set to `true`.
334342
///
343+
/// [`GroupsAccumulator::supports_blocked_groups`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator::supports_blocked_groups
344+
///
335345
#[derive(Debug, Default)]
336346
pub struct BlockedSeenValues {
337347
blocked_builders: VecDeque<BooleanBufferBuilder>,

datafusion/functions-aggregate/src/average.rs

+11
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,10 @@ where
579579
// increment counts, update sums
580580
self.counts.resize(total_num_groups, 0);
581581
self.sums.resize(total_num_groups, T::default_value());
582+
583+
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
584+
// still not support blocked groups.
585+
// More details can see `GroupsAccumulator::supports_blocked_groups`.
582586
self.null_state.accumulate(
583587
group_indices,
584588
values,
@@ -663,6 +667,10 @@ where
663667
let partial_sums = values[1].as_primitive::<T>();
664668
// update counts with partial counts
665669
self.counts.resize(total_num_groups, 0);
670+
671+
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
672+
// still not support blocked groups.
673+
// More details can see `GroupsAccumulator::supports_blocked_groups`.
666674
self.null_state.accumulate(
667675
group_indices,
668676
partial_counts,
@@ -675,6 +683,9 @@ where
675683

676684
// update sums
677685
self.sums.resize(total_num_groups, T::default_value());
686+
// `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator`
687+
// still not support blocked groups.
688+
// More details can see `GroupsAccumulator::supports_blocked_groups`.
678689
self.null_state.accumulate(
679690
group_indices,
680691
partial_sums,

datafusion/physical-plan/src/aggregates/group_values/mod.rs

+19
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ pub(crate) trait GroupValues: Send {
112112
fn clear_shrink(&mut self, batch: &RecordBatch);
113113

114114
/// Returns `true` if this accumulator supports blocked groups.
115+
///
116+
/// Blocked groups(or called blocked management approach) is an optimization
117+
/// to reduce the cost of managing aggregation intermediate states.
118+
///
119+
/// Here is brief introduction for two states management approaches:
120+
/// - Blocked approach, states are stored and managed in multiple `Vec`s,
121+
/// we call it `Block`s. Organize like this is for avoiding to resize `Vec`
122+
/// and allocate a new `Vec` instead to reduce cost and get better performance.
123+
/// When locating data in `Block`s, we need to use `block_id` to locate the
124+
/// needed `Block` at first, and use `block_offset` to locate the needed
125+
/// data in `Block` after.
126+
///
127+
/// - Single approach, all states are stored and managed in a single large `Block`.
128+
/// So when locating data, `block_id` will always be 0, and we only need `block_offset`
129+
/// to locate data in the single `Block`.
130+
///
131+
/// More details can see:
132+
/// https://github.com/apache/datafusion/issues/7065
133+
///
115134
fn supports_blocked_groups(&self) -> bool {
116135
false
117136
}

datafusion/physical-plan/src/aggregates/row_hash.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,16 @@ impl SkipAggregationProbe {
360360
/// (also with the same predefined block size based capacity)
361361
/// instead of expanding the current one and copying the data.
362362
/// This method eliminates unnecessary copies and significantly improves performance.
363-
/// For a nice introduction to the blocked approach, maybe you can see [#7065].
363+
///
364+
/// You can find some implementation details(like how to locate data in such two approaches)
365+
/// in [`GroupsAccumulator::supports_blocked_groups`] and [`GroupValues::supports_blocked_groups`].
366+
///
367+
/// And for a really detailed introduction to the design of blocked approach, maybe you can see [#7065].
364368
///
365369
/// The conditions that trigger the blocked groups optimization can be found in
366370
/// [`maybe_enable_blocked_groups`].
367371
///
372+
/// [`GroupAccumulator`]
368373
/// [`group_values`]: Self::group_values
369374
/// [`accumulators`]: Self::accumulators
370375
/// [#7065]: https://github.com/apache/datafusion/issues/7065

datafusion/sqllogictest/test_files/information_schema.slt

+4-2
Original file line numberDiff line numberDiff line change
@@ -676,14 +676,16 @@ DROP VIEW test.xyz
676676

677677

678678
# show_external_create_table()
679-
statement error DataFusion error: Object Store error: Object at location /Users/kamille/Desktop/github/datafusion/testing/data/csv/aggregate_test_100\.csv not found: No such file or directory \(os error 2\)
679+
statement ok
680680
CREATE EXTERNAL TABLE abc
681681
STORED AS CSV
682682
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
683683
OPTIONS ('format.has_header' 'true');
684684

685-
query error DataFusion error: Error during planning: table 'datafusion\.public\.abc' not found
685+
query TTTT
686686
SHOW CREATE TABLE abc;
687+
----
688+
datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv
687689

688690
# string_agg has different arg_types but same return type. Test avoiding duplicate entries for the same function.
689691
query TTT

docs/source/user-guide/configs.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus
9797
| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode |
9898
| datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. |
9999
| datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. |
100-
| datafusion.execution.enable_aggregation_blocked_groups | true | Should DataFusion use a blocked approach to manage grouping state. By default, the blocked approach is used which allocates capacity based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) instead of expanding the current one and copying the data. If `false`, a single allocation approach is used, where values are managed within a single large memory block. As this block grows, it often triggers numerous copies, resulting in poor performance. |
101100
| datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. |
101+
| datafusion.execution.enable_aggregation_blocked_groups | true | Should DataFusion use a blocked approach to manage grouping state. By default, the blocked approach is used which allocates capacity based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) instead of expanding the current one and copying the data. If `false`, a single allocation approach is used, where values are managed within a single large memory block. As this block grows, it often triggers numerous copies, resulting in poor performance. |
102102
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
103103
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
104104
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |

0 commit comments

Comments
 (0)