Skip to content

Commit e698fef

Browse files
committed
Review
1 parent 5719067 commit e698fef

File tree

8 files changed

+213
-213
lines changed

8 files changed

+213
-213
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,36 +27,34 @@ use crate::physical_optimizer::test_utils::{
2727
union_exec, RequirementsTestExec,
2828
};
2929

30-
use datafusion_physical_plan::{displayable, InputOrderMode};
3130
use arrow::compute::SortOptions;
32-
use arrow::datatypes::SchemaRef;
33-
use arrow_schema::DataType;
31+
use arrow::datatypes::{DataType, SchemaRef};
32+
use datafusion_common::config::ConfigOptions;
33+
use datafusion_common::tree_node::{TreeNode, TransformedResult};
3434
use datafusion_common::{Result, ScalarValue};
3535
use datafusion_expr::{JoinType, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition};
36+
use datafusion_execution::object_store::ObjectStoreUrl;
37+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
38+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
3639
use datafusion_physical_expr::expressions::{col, Column, NotExpr};
37-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3840
use datafusion_physical_expr::Partitioning;
39-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
40-
use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting,PlanWithCorrespondingCoalescePartitions,PlanWithCorrespondingSort,parallelize_sorts,ensure_sorting};
41-
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants,OrderPreservationContext};
42-
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
4341
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
42+
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4443
use datafusion_physical_plan::repartition::RepartitionExec;
4544
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
46-
use datafusion_physical_plan::{get_plan_string, ExecutionPlan};
47-
use datafusion_common::config::ConfigOptions;
48-
use datafusion_common::tree_node::{TreeNode, TransformedResult};
45+
use datafusion_physical_plan::sorts::sort::SortExec;
46+
use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec, WindowAggExec};
47+
use datafusion_physical_plan::{displayable, get_plan_string, ExecutionPlan, InputOrderMode};
4948
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, ParquetSource};
50-
use datafusion_execution::object_store::ObjectStoreUrl;
5149
use datafusion::datasource::listing::PartitionedFile;
50+
use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting, PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, parallelize_sorts, ensure_sorting};
51+
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext};
52+
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
5253
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
53-
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
54-
use datafusion_physical_plan::sorts::sort::SortExec;
54+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
5555
use datafusion_functions_aggregate::average::avg_udaf;
5656
use datafusion_functions_aggregate::count::count_udaf;
5757
use datafusion_functions_aggregate::min_max::{max_udaf, min_udaf};
58-
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
59-
use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec, WindowAggExec};
6058

6159
use rstest::rstest;
6260

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ use datafusion_functions_aggregate::count::count_udaf;
4040
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
4141
use datafusion_physical_expr::expressions::col;
4242
use datafusion_physical_expr::{expressions, PhysicalExpr};
43-
use datafusion_physical_expr_common::sort_expr::LexRequirement;
44-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
43+
use datafusion_physical_expr_common::sort_expr::{
44+
LexOrdering, LexRequirement, PhysicalSortExpr,
45+
};
4546
use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
4647
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4748
use datafusion_physical_plan::aggregates::{
@@ -60,11 +61,10 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
6061
use datafusion_physical_plan::tree_node::PlanContext;
6162
use datafusion_physical_plan::union::UnionExec;
6263
use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec};
63-
use datafusion_physical_plan::ExecutionPlan;
6464
use datafusion_physical_plan::{
65-
displayable, DisplayAs, DisplayFormatType, PlanProperties,
65+
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode,
66+
Partitioning, PlanProperties,
6667
};
67-
use datafusion_physical_plan::{InputOrderMode, Partitioning};
6868

6969
/// Create a non sorted parquet exec
7070
pub fn parquet_exec(schema: &SchemaRef) -> Arc<DataSourceExec> {

datafusion/physical-expr-common/src/sort_expr.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ impl LexOrdering {
361361
self.inner.clear()
362362
}
363363

364+
/// Takes ownership of the actual vector of `PhysicalSortExpr`s in the LexOrdering.
365+
pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
366+
self.inner
367+
}
368+
364369
/// Returns `true` if the LexOrdering contains `expr`
365370
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
366371
self.inner.contains(expr)

datafusion/physical-expr/src/equivalence/class.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -456,17 +456,19 @@ impl EquivalenceGroup {
456456
/// The expression is replaced with the first expression in the equivalence
457457
/// class it matches with (if any).
458458
pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
459-
Arc::clone(&expr)
460-
.transform(|expr| {
461-
for cls in self.iter() {
462-
if cls.contains(&expr) {
463-
return Ok(Transformed::yes(cls.canonical_expr().unwrap()));
464-
}
459+
expr.transform(|expr| {
460+
for cls in self.iter() {
461+
if cls.contains(&expr) {
462+
// The unwrap below is safe because the guard above ensures
463+
// that the class is not empty.
464+
return Ok(Transformed::yes(cls.canonical_expr().unwrap()));
465465
}
466-
Ok(Transformed::no(expr))
467-
})
468-
.data()
469-
.unwrap_or(expr)
466+
}
467+
Ok(Transformed::no(expr))
468+
})
469+
.data()
470+
.unwrap()
471+
// The unwrap above is safe because the closure always returns `Ok`.
470472
}
471473

472474
/// Normalizes the given sort expression according to this group.
@@ -585,20 +587,21 @@ impl EquivalenceGroup {
585587
(new_class.len() > 1).then_some(EquivalenceClass::new(new_class))
586588
});
587589

588-
// the key is the source expression and the value is the EquivalenceClass that contains the target expression of the source expression.
589-
let mut new_classes: IndexMap<Arc<dyn PhysicalExpr>, EquivalenceClass> =
590-
IndexMap::new();
591-
mapping.iter().for_each(|(source, target)| {
592-
// We need to find equivalent projected expressions.
593-
// e.g. table with columns [a,b,c] and a == b, projection: [a+c, b+c].
594-
// To conclude that a + c == b + c we firsty normalize all source expressions
595-
// in the mapping, then merge all equivalent expressions into the classes.
590+
// The key is the source expression, and the value is the equivalence
591+
// class that contains the corresponding target expression.
592+
let mut new_classes: IndexMap<_, _> = IndexMap::new();
593+
for (source, target) in mapping.iter() {
594+
// We need to find equivalent projected expressions. For example,
595+
// consider a table with columns `[a, b, c]` with `a` == `b`, and
596+
// projection `[a + c, b + c]`. To conclude that `a + c == b + c`,
597+
// we first normalize all source expressions in the mapping, then
598+
// merge all equivalent expressions into the classes.
596599
let normalized_expr = self.normalize_expr(Arc::clone(source));
597600
new_classes
598601
.entry(normalized_expr)
599602
.or_insert_with(EquivalenceClass::new_empty)
600603
.push(Arc::clone(target));
601-
});
604+
}
602605
// Only add equivalence classes with at least two members as singleton
603606
// equivalence classes are meaningless.
604607
let new_classes = new_classes
@@ -642,7 +645,7 @@ impl EquivalenceGroup {
642645
// are equal in the resulting table.
643646
if join_type == &JoinType::Inner {
644647
for (lhs, rhs) in on.iter() {
645-
let new_lhs = Arc::clone(lhs) as _;
648+
let new_lhs = Arc::clone(lhs);
646649
// Rewrite rhs to point to the right side of the join:
647650
let new_rhs = Arc::clone(rhs)
648651
.transform(|expr| {

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -237,40 +237,58 @@ impl OrderingEquivalenceClass {
237237
None
238238
}
239239

240-
/// Checks whether the given expression is constant according to `self`'s ordering equivalence class.
240+
/// Checks whether the given expression is partially constant according to
241+
/// this ordering equivalence class.
241242
///
242-
/// This function determines whether `expr` appears in at least one combination of `descending`
243-
/// and `nulls_first` options that indicate constantness. Specifically, an expression is
244-
/// considered constant if it satisfies either of the following conditions:
245-
/// - `descending & nulls_first` and `ascending & nulls_last`
246-
/// - `descending & nulls_last` and `ascending & nulls_first`
243+
/// This function determines whether `expr` appears in at least one combination
244+
/// of `descending` and `nulls_first` options that indicate partial constantness
245+
/// in a lexicographical ordering. Specifically, an expression is considered
246+
/// a partial constant in this context if its `SortOptions` satisfies either
247+
/// of the following conditions:
248+
/// - It is `descending` with `nulls_first` and _also_ `ascending` with
249+
/// `nulls_last`, OR
250+
/// - It is `descending` with `nulls_last` and _also_ `ascending` with
251+
/// `nulls_first`.
247252
///
248-
/// We primarily use `ConstExpr` to represent globally constant expressions. However, some expressions
249-
/// may only be constant within secondary ordering constraints. This function helps identify such cases.
250-
/// If an expression is constant within a prefix ordering, it is added as a constant during
251-
/// `ordering_satisfy_requirement()` iterations after the corresponding prefix requirement is satisfied.
253+
/// The equivalence mechanism primarily uses `ConstExpr`s to represent globally
254+
/// constant expressions. However, some expressions may only be partially
255+
/// constant within a lexicographical ordering. This function helps identify
256+
/// such cases. If an expression is constant within a prefix ordering, it is
257+
/// added as a constant during `ordering_satisfy_requirement()` iterations
258+
/// after the corresponding prefix requirement is satisfied.
252259
///
253-
/// ### Example Scenarios (Assuming All Expressions Share the Same Sort Properties)
260+
/// ### Example Scenarios
261+
///
262+
/// In these scenarios, we assume that all expressions share the same sort
263+
/// properties.
254264
///
255265
/// #### Case 1: Sort Requirement `[a, c]`
256-
/// - **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
257-
/// 1. `ordering_satisfy_single()` returns `true` because the requirement `[a]` is satisfied by `[a, b, c].first()`
258-
/// 2. `[a]` is added as a constant to the existing orderings
259-
/// 3. The normalized orderings become `[[b, c], [d]]`
260-
/// 4. `ordering_satisfy_single()` returns `false` for `[c]`, as neither `[b, c]` nor `[d]` satisfies `[c]`
266+
///
267+
/// **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
268+
/// 1. `ordering_satisfy_single()` returns `true` because the requirement
269+
/// `a` is satisfied by `[a, b, c].first()`.
270+
/// 2. `a` is added as a constant for the next iteration.
271+
/// 3. The normalized orderings become `[[b, c], [d]]`.
272+
/// 4. `ordering_satisfy_single()` returns `false` for `c`, as neither
273+
/// `[b, c]` nor `[d]` satisfies `c`.
261274
///
262275
/// #### Case 2: Sort Requirement `[a, d]`
263-
/// - **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
264-
/// 1. `ordering_satisfy_single()` returns `true` because `[a]` is satisfied by `[a, b, c].first()`
265-
/// 2. `[a]` is added as a constant to the existing orderings
266-
/// 3. The normalized orderings become `[[b, c], [d]]`
267-
/// 4. `ordering_satisfy_single()` returns `true` for `[d]`, as `[d]` satisfies `[d]`
276+
///
277+
/// **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
278+
/// 1. `ordering_satisfy_single()` returns `true` because the requirement
279+
/// `a` is satisfied by `[a, b, c].first()`.
280+
/// 2. `a` is added as a constant for the next iteration.
281+
/// 3. The normalized orderings become `[[b, c], [d]]`.
282+
/// 4. `ordering_satisfy_single()` returns `true` for `d`, as `[d]` satisfies
283+
/// `d`.
268284
///
269285
/// ### Future Improvements
270-
/// This function may become unnecessary if any of the following improvements are implemented:
286+
///
287+
/// This function may become unnecessary if any of the following improvements
288+
/// are implemented:
271289
/// 1. `SortOptions` supports encoding constantness information.
272-
/// 2. `EquivalenceProperties` gains `FunctionalDependency` awareness, eliminating the need for
273-
/// `Constant` and `Constraints`.
290+
/// 2. `EquivalenceProperties` gains `FunctionalDependency` awareness, eliminating
291+
/// the need for `Constant` and `Constraints`.
274292
pub fn is_expr_partial_const(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
275293
let mut constantness_defining_pairs = [
276294
HashSet::from([(false, false), (true, true)]),

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,16 +1464,12 @@ fn update_properties(
14641464
let normalized_expr = eq_properties
14651465
.eq_group
14661466
.normalize_expr(Arc::clone(&node.expr));
1467+
let oeq_class = eq_properties.normalized_oeq_class();
14671468
if eq_properties.is_expr_constant(&normalized_expr)
1468-
|| eq_properties
1469-
.normalized_oeq_class()
1470-
.is_expr_partial_const(&normalized_expr)
1469+
|| oeq_class.is_expr_partial_const(&normalized_expr)
14711470
{
14721471
node.data.sort_properties = SortProperties::Singleton;
1473-
} else if let Some(options) = eq_properties
1474-
.normalized_oeq_class()
1475-
.get_options(&normalized_expr)
1476-
{
1472+
} else if let Some(options) = oeq_class.get_options(&normalized_expr) {
14771473
node.data.sort_properties = SortProperties::Ordered(options);
14781474
}
14791475
Ok(Transformed::yes(node))

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,14 @@ fn pushdown_requirement_to_children(
192192
Ok(Some(vec![req]))
193193
}
194194
RequirementsCompatibility::Compatible(adjusted) => {
195-
// If parent requirements are more specific than output ordering of the window plan,
196-
// then we can deduce that parent expects an ordering from the columns constructed
197-
// by the window functions. If that's the case, we block the pushdown of sort operation.
198-
let avoid_pushdown = !plan
195+
// If parent requirements are more specific than output ordering
196+
// of the window plan, then we can deduce that the parent expects
197+
// an ordering from the columns created by window functions. If
198+
// that's the case, we block the pushdown of sort operation.
199+
if !plan
199200
.equivalence_properties()
200-
.ordering_satisfy_requirement(parent_required);
201-
if avoid_pushdown {
201+
.ordering_satisfy_requirement(parent_required)
202+
{
202203
return Ok(None);
203204
}
204205

0 commit comments

Comments
 (0)