Skip to content

feat: add optimizer config param to avoid grouping partitions prefer_existing_union #10259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ config_namespace! {
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false
}
}

Expand Down
90 changes: 83 additions & 7 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,11 @@ fn ensure_distribution(
.collect::<Result<Vec<_>>>()?;

let children_plans = children.iter().map(|c| c.plan.clone()).collect::<Vec<_>>();
plan = if plan.as_any().is::<UnionExec>() && can_interleave(children_plans.iter()) {

plan = if plan.as_any().is::<UnionExec>()
&& !config.optimizer.prefer_existing_union
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that one of the motivations (influxdata#4) for this new flag is to preserve the sorting of the Union - would re-using the existing flag prefer_existing_sort make sense here?

One argument against that would be if you wanted prefer_existing_sort: false and prefer_existing_union: true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that one of the motivations (influxdata#4) for this new flag is to preserve the sorting of the Union - would re-using the existing flag prefer_existing_sort make sense here?

One argument against that would be if you wanted prefer_existing_sort: false and prefer_existing_union: true.

This is an excellent point @phillipleblanc . I just double checked and we actually have prefer_existing_sort set to true in IOx already (code ref, not public)

What do you think about using the existing flag @NGA-TRAN ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phillipleblanc and @alamb
@mustafasrepo 's comment #10259 (comment) suggest to use the current approach. Do I need to do anything more here?

Copy link
Contributor

@phillipleblanc phillipleblanc Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I think this approach looks good! Thanks!

&& can_interleave(children_plans.iter())
{
// Add a special case for [`UnionExec`] since we want to "bubble up"
// hash-partitioned data. So instead of
//
Expand Down Expand Up @@ -1721,23 +1725,33 @@ pub(crate) mod tests {
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these macros are getting a bit hairy -- maybe we can clean them up (convert to using functions rather than macros) in a subsequent PR

};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -3097,7 +3111,67 @@ pub(crate) mod tests {
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
assert_optimized!(expected, plan.clone(), false);

Ok(())
}

#[test]
fn union_not_to_interleave() -> Result<()> {
// group by (a as a1)
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
// group by (a as a2)
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);

// Union
let plan = Arc::new(UnionExec::new(vec![left, right]));

// final agg
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);

// Only two RepartitionExecs added, no final RepartitionExec required
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"UnionExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
// no sort in the plan but since we need it as a parameter, make it default false
let prefer_existing_sort = false;
let first_enforce_distribution = true;
let prefer_existing_union = true;

assert_optimized!(
expected,
plan.clone(),
first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);
assert_optimized!(
expected,
plan,
!first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observing the test union_to_interleave above it. If it covers all needed cases, these 2 tests will cover all needed cases for the not-convert, too. Let me know if it is not the case

);

Ok(())
}
Expand Down Expand Up @@ -3651,7 +3725,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

let expected = [
Expand All @@ -3668,7 +3743,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

Ok(())
Expand Down Expand Up @@ -3731,7 +3807,7 @@ pub(crate) mod tests {
)),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10);
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
Expand Down Expand Up @@ -294,6 +295,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down