Skip to content

Commit aa6b570

Browse files
NGA-TRANappletreeisyellow
authored andcommitted
feat: add optimizer config param to avoid grouping partitions prefer_existing_union (apache#10259)
* feat: add a config param to avoid converting union to interleave * chore: update config for the tests * chore: update configs.md
1 parent 108abe7 commit aa6b570

File tree

4 files changed

+89
-7
lines changed

4 files changed

+89
-7
lines changed

datafusion/common/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,9 @@ config_namespace! {
571571
/// when an exact selectivity cannot be determined. Valid values are
572572
/// between 0 (no selectivity) and 100 (all rows are selected).
573573
pub default_filter_selectivity: u8, default = 20
574+
575+
/// When set to true, the optimizer will not attempt to convert Union to Interleave
576+
pub prefer_existing_union: bool, default = false
574577
}
575578
}
576579

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,7 +1192,11 @@ fn ensure_distribution(
11921192
.collect::<Result<Vec<_>>>()?;
11931193

11941194
let children_plans = children.iter().map(|c| c.plan.clone()).collect::<Vec<_>>();
1195-
plan = if plan.as_any().is::<UnionExec>() && can_interleave(children_plans.iter()) {
1195+
1196+
plan = if plan.as_any().is::<UnionExec>()
1197+
&& !config.optimizer.prefer_existing_union
1198+
&& can_interleave(children_plans.iter())
1199+
{
11961200
// Add a special case for [`UnionExec`] since we want to "bubble up"
11971201
// hash-partitioned data. So instead of
11981202
//
@@ -1731,23 +1735,33 @@ pub(crate) mod tests {
17311735
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
17321736
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
17331737
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
1738+
/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave
17341739
macro_rules! assert_optimized {
17351740
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
1736-
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
1741+
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false);
17371742
};
17381743

17391744
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
1740-
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
1745+
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false);
1746+
};
1747+
1748+
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
1749+
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
17411750
};
17421751

17431752
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
1753+
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false);
1754+
};
1755+
1756+
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
17441757
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
17451758

17461759
let mut config = ConfigOptions::new();
17471760
config.execution.target_partitions = $TARGET_PARTITIONS;
17481761
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
17491762
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
17501763
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
1764+
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;
17511765

17521766
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
17531767
// because they were written prior to the separation of `BasicEnforcement` into
@@ -3107,7 +3121,67 @@ pub(crate) mod tests {
31073121
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
31083122
];
31093123
assert_optimized!(expected, plan.clone(), true);
3110-
assert_optimized!(expected, plan, false);
3124+
assert_optimized!(expected, plan.clone(), false);
3125+
3126+
Ok(())
3127+
}
3128+
3129+
#[test]
3130+
fn union_not_to_interleave() -> Result<()> {
3131+
// group by (a as a1)
3132+
let left = aggregate_exec_with_alias(
3133+
parquet_exec(),
3134+
vec![("a".to_string(), "a1".to_string())],
3135+
);
3136+
// group by (a as a2)
3137+
let right = aggregate_exec_with_alias(
3138+
parquet_exec(),
3139+
vec![("a".to_string(), "a1".to_string())],
3140+
);
3141+
3142+
// Union
3143+
let plan = Arc::new(UnionExec::new(vec![left, right]));
3144+
3145+
// final agg
3146+
let plan =
3147+
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);
3148+
3149+
// Only two RepartitionExecs added, no final RepartitionExec required
3150+
let expected = &[
3151+
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
3152+
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
3153+
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
3154+
"UnionExec",
3155+
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
3156+
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3157+
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
3158+
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
3159+
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
3160+
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
3161+
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3162+
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
3163+
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
3164+
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
3165+
];
3166+
// no sort in the plan but since we need it as a parameter, make it default false
3167+
let prefer_existing_sort = false;
3168+
let first_enforce_distribution = true;
3169+
let prefer_existing_union = true;
3170+
3171+
assert_optimized!(
3172+
expected,
3173+
plan.clone(),
3174+
first_enforce_distribution,
3175+
prefer_existing_sort,
3176+
prefer_existing_union
3177+
);
3178+
assert_optimized!(
3179+
expected,
3180+
plan,
3181+
!first_enforce_distribution,
3182+
prefer_existing_sort,
3183+
prefer_existing_union
3184+
);
31113185

31123186
Ok(())
31133187
}
@@ -3661,7 +3735,8 @@ pub(crate) mod tests {
36613735
true,
36623736
target_partitions,
36633737
true,
3664-
repartition_size
3738+
repartition_size,
3739+
false
36653740
);
36663741

36673742
let expected = [
@@ -3678,7 +3753,8 @@ pub(crate) mod tests {
36783753
true,
36793754
target_partitions,
36803755
true,
3681-
repartition_size
3756+
repartition_size,
3757+
false
36823758
);
36833759

36843760
Ok(())
@@ -3741,7 +3817,7 @@ pub(crate) mod tests {
37413817
)),
37423818
vec![("a".to_string(), "a".to_string())],
37433819
);
3744-
assert_optimized!(expected, plan, true, false, 2, true, 10);
3820+
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
37453821
}
37463822
Ok(())
37473823
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
216216
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
217217
datafusion.optimizer.max_passes 3
218218
datafusion.optimizer.prefer_existing_sort false
219+
datafusion.optimizer.prefer_existing_union false
219220
datafusion.optimizer.prefer_hash_join true
220221
datafusion.optimizer.repartition_aggregations true
221222
datafusion.optimizer.repartition_file_min_size 10485760
@@ -294,6 +295,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es
294295
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
295296
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
296297
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
298+
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
297299
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
298300
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
299301
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
103103
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
104104
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
105105
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
106+
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
106107
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
107108
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
108109
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |

0 commit comments

Comments
 (0)