Skip to content

Commit 2e55f59

Browse files
committed
Mut-free re-implementation
1 parent 25257ff commit 2e55f59

File tree

5 files changed

+178
-112
lines changed

5 files changed

+178
-112
lines changed

datafusion/core/src/physical_plan/filter.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,13 @@ impl ExecutionPlan for FilterExec {
172172
/// predicate's selectivity value can be determined for the incoming data.
173173
fn statistics(&self) -> Statistics {
174174
let input_stats = self.input.statistics();
175-
let mut analysis_ctx =
175+
let starter_ctx =
176176
AnalysisContext::from_statistics(self.input.schema().as_ref(), &input_stats);
177177

178-
let predicate_selectivity = self
179-
.predicate
180-
.boundaries(&mut analysis_ctx)
181-
.and_then(|bounds| bounds.selectivity);
178+
let analysis_ctx = self.predicate.analyze(starter_ctx);
182179

183-
match predicate_selectivity {
184-
Some(selectivity) => {
180+
match analysis_ctx.boundaries {
181+
Some(boundaries) => {
185182
// Build back the column level statistics from the boundaries inside the
186183
// analysis context. It is possible that these are going to be different
187184
// than the input statistics, especially when a comparison is made inside
@@ -200,14 +197,19 @@ impl ExecutionPlan for FilterExec {
200197
.collect();
201198

202199
Statistics {
203-
num_rows: input_stats
204-
.num_rows
205-
.map(|num_rows| (num_rows as f64 * selectivity).ceil() as usize),
200+
num_rows: input_stats.num_rows.zip(boundaries.selectivity).map(
201+
|(num_rows, selectivity)| {
202+
(num_rows as f64 * selectivity).ceil() as usize
203+
},
204+
),
205+
total_byte_size: input_stats
206+
.total_byte_size
207+
.zip(boundaries.selectivity)
208+
.map(|(num_rows, selectivity)| {
209+
(num_rows as f64 * selectivity).ceil() as usize
210+
}),
206211
column_statistics: Some(column_statistics),
207-
total_byte_size: input_stats.total_byte_size.map(|total_byte_size| {
208-
(total_byte_size as f64 * selectivity).ceil() as usize
209-
}),
210-
..Default::default(),
212+
..Default::default()
211213
}
212214
}
213215
None => Statistics::default(),

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 113 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ use arrow::record_batch::RecordBatch;
7373

7474
use super::column::Column;
7575
use crate::physical_expr::down_cast_any_ref;
76-
use crate::{AnalysisContext, ExprBoundaries, PhysicalExpr};
76+
use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr};
7777
use datafusion_common::cast::{as_boolean_array, as_decimal128_array};
7878
use datafusion_common::ScalarValue;
7979
use datafusion_common::{DataFusionError, Result};
@@ -659,7 +659,7 @@ impl PhysicalExpr for BinaryExpr {
659659
}
660660

661661
/// Return the boundaries of this binary expression's result.
662-
fn boundaries(&self, context: &mut AnalysisContext) -> Option<ExprBoundaries> {
662+
fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
663663
match &self.op {
664664
Operator::Eq
665665
| Operator::Gt
@@ -668,26 +668,42 @@ impl PhysicalExpr for BinaryExpr {
668668
| Operator::GtEq => {
669669
// We currently only support comparison when we know at least one of the sides are
670670
// a known value (a scalar). This includes predicates like a > 20 or 5 > a.
671-
let left_boundaries = self.left.boundaries(context)?;
672-
let right_boundaries = self.right.boundaries(context)?;
673-
let (op, left, right) = match right_boundaries.reduce() {
674-
Some(right_value) => {
671+
let context = self.left.analyze(context);
672+
let left_boundaries =
673+
analysis_expect!(context, context.boundaries()).clone();
674+
675+
let context = self.right.analyze(context);
676+
let right_boundaries =
677+
analysis_expect!(context, context.boundaries.clone());
678+
679+
match (left_boundaries.reduce(), right_boundaries.reduce()) {
680+
(_, Some(right_value)) => {
675681
// We know the right side is a scalar, so we can use the operator as is
676-
(self.op, self.left.clone(), right_value)
682+
analyze_expr_scalar_comparison(
683+
context,
684+
&self.op,
685+
&self.left,
686+
right_value,
687+
)
677688
}
678-
None => {
689+
(Some(left_value), _) => {
679690
// If not, we have to swap the operator and left/right (since this means
680691
// left has to be a scalar).
681-
(
682-
self.op.swap()?,
683-
self.right.clone(),
684-
left_boundaries.reduce()?,
692+
let swapped_op = analysis_expect!(context, self.op.swap());
693+
analyze_expr_scalar_comparison(
694+
context,
695+
&swapped_op,
696+
&self.right,
697+
left_value,
685698
)
686699
}
687-
};
688-
analyze_expr_scalar_comparison(&op, context, &left, right)
700+
_ => {
701+
// Both sides are columns, so we give up.
702+
context.with_boundaries(None)
703+
}
704+
}
689705
}
690-
_ => None,
706+
_ => context.with_boundaries(None),
691707
}
692708
}
693709
}
@@ -705,12 +721,12 @@ impl PartialEq<dyn Any> for BinaryExpr {
705721
// (on the right). The new boundaries will indicate whether it is always true, always
706722
// false, or unknown (with a probablistic selectivity value attached).
707723
fn analyze_expr_scalar_comparison(
724+
context: AnalysisContext,
708725
op: &Operator,
709-
context: &mut AnalysisContext,
710726
left: &Arc<dyn PhysicalExpr>,
711727
right: ScalarValue,
712-
) -> Option<ExprBoundaries> {
713-
let left_bounds = left.boundaries(context)?;
728+
) -> AnalysisContext {
729+
let left_bounds = analysis_expect!(context, left.analyze(context.clone()).boundaries);
714730
let left_min = left_bounds.min_value;
715731
let left_max = left_bounds.max_value;
716732

@@ -738,68 +754,74 @@ fn analyze_expr_scalar_comparison(
738754
assert!(!(always_selects && never_selects));
739755

740756
let selectivity = match (always_selects, never_selects) {
741-
(true, _) => Some(1.0),
742-
(_, true) => Some(0.0),
757+
(true, _) => 1.0,
758+
(_, true) => 0.0,
743759
(false, false) => {
744760
// If there is a partial overlap, then we can estimate the selectivity
745761
// by computing the ratio of the existing overlap to the total range. Since we
746762
// currently don't have access to a value distribution histogram, the part below
747763
// assumes a uniform distribution by default.
748764

749765
// Our [min, max] is inclusive, so we need to add 1 to the difference.
750-
let total_range = left_max.distance(&left_min)? + 1;
751-
let overlap_between_boundaries = match op {
752-
Operator::Lt => right.distance(&left_min)?,
753-
Operator::Gt => left_max.distance(&right)?,
754-
Operator::LtEq => right.distance(&left_min)? + 1,
755-
Operator::GtEq => left_max.distance(&right)? + 1,
756-
Operator::Eq => 1,
757-
_ => unreachable!(),
758-
};
766+
let total_range = analysis_expect!(context, left_max.distance(&left_min)) + 1;
767+
let overlap_between_boundaries = analysis_expect!(
768+
context,
769+
match op {
770+
Operator::Lt => right.distance(&left_min),
771+
Operator::Gt => left_max.distance(&right),
772+
Operator::LtEq => right.distance(&left_min).map(|dist| dist + 1),
773+
Operator::GtEq => left_max.distance(&right).map(|dist| dist + 1),
774+
Operator::Eq => Some(1),
775+
_ => None,
776+
}
777+
);
759778

760-
Some(overlap_between_boundaries as f64 / total_range as f64)
779+
overlap_between_boundaries as f64 / total_range as f64
761780
}
762-
}?;
781+
};
763782

764783
// The context represents all the knowledge we have gathered during the
765784
// analysis process, which we can now add more since the expression's upper
766785
// and lower boundaries might have changed.
767-
if let Some(column_expr) = left.as_any().downcast_ref::<Column>() {
768-
let (left_min, left_max) = match op {
769-
// TODO: for lt/gt, we technically should shrink the possibility space
770-
// by one since a < 5 means that 5 is not a possible value for `a`. However,
771-
// it is currently tricky to do so (e.g. for floats, we can get away with 4.999
772-
// so we need a smarter logic to find out what is the closest value that is
773-
// different from the scalar_value).
774-
Operator::Lt | Operator::LtEq => {
775-
// We only want to update the upper bound when we know it will help us (e.g.
776-
// it is actually smaller than what we have right now) and it is a valid
777-
// value (e.g. [0, 100] < -100 would update the boundaries to [0, -100] if
778-
// there weren't the selectivity check).
779-
if right < left_max && selectivity > 0.0 {
780-
(left_min, right)
781-
} else {
782-
(left_min, left_max)
786+
let context = match left.as_any().downcast_ref::<Column>() {
787+
Some(column_expr) => {
788+
let (left_min, left_max) = match op {
789+
// TODO: for lt/gt, we technically should shrink the possibility space
790+
// by one since a < 5 means that 5 is not a possible value for `a`. However,
791+
// it is currently tricky to do so (e.g. for floats, we can get away with 4.999
792+
// so we need a smarter logic to find out what is the closest value that is
793+
// different from the scalar_value).
794+
Operator::Lt | Operator::LtEq => {
795+
// We only want to update the upper bound when we know it will help us (e.g.
796+
// it is actually smaller than what we have right now) and it is a valid
797+
// value (e.g. [0, 100] < -100 would update the boundaries to [0, -100] if
798+
// there weren't the selectivity check).
799+
if right < left_max && selectivity > 0.0 {
800+
(left_min, right)
801+
} else {
802+
(left_min, left_max)
803+
}
783804
}
784-
}
785-
Operator::Gt | Operator::GtEq => {
786-
// Same as above, but this time we want to limit the lower bound.
787-
if right > left_min && selectivity > 0.0 {
788-
(right, left_max)
789-
} else {
790-
(left_min, left_max)
805+
Operator::Gt | Operator::GtEq => {
806+
// Same as above, but this time we want to limit the lower bound.
807+
if right > left_min && selectivity > 0.0 {
808+
(right, left_max)
809+
} else {
810+
(left_min, left_max)
811+
}
791812
}
792-
}
793-
// For equality, we don't have the range problem so even if the selectivity
794-
// is 0.0, we can still update the boundaries.
795-
Operator::Eq => (right.clone(), right),
796-
_ => unreachable!(),
797-
};
813+
// For equality, we don't have the range problem so even if the selectivity
814+
// is 0.0, we can still update the boundaries.
815+
Operator::Eq => (right.clone(), right),
816+
_ => unreachable!(),
817+
};
798818

799-
let left_bounds =
800-
ExprBoundaries::new(left_min, left_max, left_bounds.distinct_count);
801-
context.update_column(column_expr.index(), left_bounds);
802-
}
819+
let left_bounds =
820+
ExprBoundaries::new(left_min, left_max, left_bounds.distinct_count);
821+
context.with_column_update(column_expr.index(), left_bounds)
822+
}
823+
None => context,
824+
};
803825

804826
// The selectivity can't be be greater than 1.0.
805827
assert!(selectivity <= 1.0);
@@ -810,12 +832,13 @@ fn analyze_expr_scalar_comparison(
810832
_ => (false, true, 2),
811833
};
812834

813-
Some(ExprBoundaries::new_with_selectivity(
835+
let result_boundaries = Some(ExprBoundaries::new_with_selectivity(
814836
ScalarValue::Boolean(Some(pred_min)),
815837
ScalarValue::Boolean(Some(pred_max)),
816838
Some(pred_distinct),
817839
Some(selectivity),
818-
))
840+
));
841+
context.with_boundaries(result_boundaries)
819842
}
820843

821844
/// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel
@@ -3151,12 +3174,15 @@ mod tests {
31513174
];
31523175

31533176
for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases {
3154-
let mut context = AnalysisContext::from_statistics(&schema, &statistics);
3177+
let context = AnalysisContext::from_statistics(&schema, &statistics);
31553178
let left = col("a", &schema).unwrap();
31563179
let right = ScalarValue::Int64(Some(rhs));
3157-
let boundaries =
3158-
analyze_expr_scalar_comparison(&operator, &mut context, &left, right)
3159-
.expect("this case should not return None");
3180+
let analysis_ctx =
3181+
analyze_expr_scalar_comparison(context, &operator, &left, right);
3182+
let boundaries = analysis_ctx
3183+
.clone()
3184+
.boundaries
3185+
.expect("Analysis must complete for this test!");
31603186

31613187
assert_eq!(
31623188
boundaries
@@ -3182,14 +3208,14 @@ mod tests {
31823208
// For getting the updated boundaries, we can simply analyze the LHS
31833209
// with the existing context.
31843210
let left_boundaries = left
3185-
.boundaries(&mut context)
3211+
.analyze(analysis_ctx)
3212+
.boundaries
31863213
.expect("this case should not return None");
31873214
assert_eq!(left_boundaries.min_value, ScalarValue::Int64(Some(exp_min)));
31883215
assert_eq!(left_boundaries.max_value, ScalarValue::Int64(Some(exp_max)));
31893216
}
31903217
Ok(())
31913218
}
3192-
31933219
#[test]
31943220
fn test_comparison_result_estimate_different_type() -> Result<()> {
31953221
// A table where the column 'a' has a min of 1.3, a max of 50.7.
@@ -3228,12 +3254,15 @@ mod tests {
32283254
];
32293255

32303256
for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases {
3231-
let mut context = AnalysisContext::from_statistics(&schema, &statistics);
3257+
let context = AnalysisContext::from_statistics(&schema, &statistics);
32323258
let left = col("a", &schema).unwrap();
32333259
let right = ScalarValue::from(rhs);
3234-
let boundaries =
3235-
analyze_expr_scalar_comparison(&operator, &mut context, &left, right)
3236-
.expect("this case should not return None");
3260+
let analysis_ctx =
3261+
analyze_expr_scalar_comparison(context, &operator, &left, right);
3262+
let boundaries = analysis_ctx
3263+
.clone()
3264+
.boundaries
3265+
.expect("Analysis must complete for this test!");
32373266

32383267
assert_eq!(
32393268
boundaries
@@ -3257,7 +3286,8 @@ mod tests {
32573286
}
32583287

32593288
let left_boundaries = left
3260-
.boundaries(&mut context)
3289+
.analyze(analysis_ctx)
3290+
.boundaries
32613291
.expect("this case should not return None");
32623292
assert_eq!(
32633293
left_boundaries.min_value,
@@ -3286,9 +3316,10 @@ mod tests {
32863316
&schema,
32873317
);
32883318

3289-
let mut context = AnalysisContext::from_statistics(&schema, &statistics);
3319+
let context = AnalysisContext::from_statistics(&schema, &statistics);
32903320
let predicate_boundaries = gt
3291-
.boundaries(&mut context)
3321+
.analyze(context)
3322+
.boundaries
32923323
.expect("boundaries should not be None");
32933324
assert_eq!(predicate_boundaries.selectivity, Some(0.76));
32943325

@@ -3314,9 +3345,10 @@ mod tests {
33143345
&schema,
33153346
);
33163347

3317-
let mut context = AnalysisContext::from_statistics(&schema, &statistics);
3348+
let context = AnalysisContext::from_statistics(&schema, &statistics);
33183349
let predicate_boundaries = gt
3319-
.boundaries(&mut context)
3350+
.analyze(context)
3351+
.boundaries
33203352
.expect("boundaries should not be None");
33213353
assert_eq!(predicate_boundaries.selectivity, Some(0.5));
33223354

0 commit comments

Comments
 (0)