Skip to content

Commit ea3f632

Browse files
committed
Implement foundational filter selectivity analysis
1 parent 17d0217 commit ea3f632

File tree

10 files changed

+929
-40
lines changed

10 files changed

+929
-40
lines changed

datafusion/common/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ pub mod from_slice;
2323
#[cfg(feature = "pyarrow")]
2424
mod pyarrow;
2525
pub mod scalar;
26+
pub mod stats;
2627

2728
pub use column::Column;
2829
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
2930
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
3031
pub use scalar::{ScalarType, ScalarValue};
32+
pub use stats::{ColumnStatistics, Statistics};
3133

3234
/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
3335
/// not possible. In normal usage of DataFusion the downcast should always succeed.

datafusion/common/src/scalar.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,43 @@ impl ScalarValue {
10051005
}
10061006
}
10071007

1008+
/// Absolute distance between two numeric values (of the same type). This method will return
1009+
/// None if either one of the arguments are null. It might also return None if the resulting
1010+
/// distance is greater than [`usize::MAX`]. If the type is a float, then the distance will be
1011+
/// rounded to the nearest integer.
1012+
///
1013+
///
1014+
/// Note: the datatype itself must support subtraction.
1015+
pub fn distance(&self, other: &ScalarValue) -> Option<usize> {
1016+
// Having an explicit null check here is important because the
1017+
// subtraction for scalar values will return a real value even
1018+
// if one side is null.
1019+
if self.is_null() || other.is_null() {
1020+
return None;
1021+
}
1022+
1023+
let distance = if self > other {
1024+
self.sub(other).ok()?
1025+
} else {
1026+
other.sub(self).ok()?
1027+
};
1028+
1029+
match distance {
1030+
ScalarValue::Int8(Some(v)) => usize::try_from(v).ok(),
1031+
ScalarValue::Int16(Some(v)) => usize::try_from(v).ok(),
1032+
ScalarValue::Int32(Some(v)) => usize::try_from(v).ok(),
1033+
ScalarValue::Int64(Some(v)) => usize::try_from(v).ok(),
1034+
ScalarValue::UInt8(Some(v)) => Some(v as usize),
1035+
ScalarValue::UInt16(Some(v)) => Some(v as usize),
1036+
ScalarValue::UInt32(Some(v)) => usize::try_from(v).ok(),
1037+
ScalarValue::UInt64(Some(v)) => usize::try_from(v).ok(),
1038+
// TODO: we might want to look into supporting ceil/floor here for floats.
1039+
ScalarValue::Float32(Some(v)) => Some(v.round() as usize),
1040+
ScalarValue::Float64(Some(v)) => Some(v.round() as usize),
1041+
_ => None,
1042+
}
1043+
}
1044+
10081045
/// Converts a scalar value into an 1-row array.
10091046
pub fn to_array(&self) -> ArrayRef {
10101047
self.to_array_of_size(1)
@@ -3810,4 +3847,154 @@ mod tests {
38103847
]
38113848
);
38123849
}
3850+
3851+
#[test]
3852+
fn test_scalar_distance() {
3853+
let cases = [
3854+
// scalar (lhs), scalar (rhs), expected distance
3855+
// ---------------------------------------------
3856+
(ScalarValue::Int8(Some(1)), ScalarValue::Int8(Some(2)), 1),
3857+
(ScalarValue::Int8(Some(2)), ScalarValue::Int8(Some(1)), 1),
3858+
(
3859+
ScalarValue::Int16(Some(-5)),
3860+
ScalarValue::Int16(Some(5)),
3861+
10,
3862+
),
3863+
(
3864+
ScalarValue::Int16(Some(5)),
3865+
ScalarValue::Int16(Some(-5)),
3866+
10,
3867+
),
3868+
(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(0)), 0),
3869+
(
3870+
ScalarValue::Int32(Some(-5)),
3871+
ScalarValue::Int32(Some(-10)),
3872+
5,
3873+
),
3874+
(
3875+
ScalarValue::Int64(Some(-10)),
3876+
ScalarValue::Int64(Some(-5)),
3877+
5,
3878+
),
3879+
(ScalarValue::UInt8(Some(1)), ScalarValue::UInt8(Some(2)), 1),
3880+
(ScalarValue::UInt8(Some(0)), ScalarValue::UInt8(Some(0)), 0),
3881+
(
3882+
ScalarValue::UInt16(Some(5)),
3883+
ScalarValue::UInt16(Some(10)),
3884+
5,
3885+
),
3886+
(
3887+
ScalarValue::UInt32(Some(10)),
3888+
ScalarValue::UInt32(Some(5)),
3889+
5,
3890+
),
3891+
(
3892+
ScalarValue::UInt64(Some(5)),
3893+
ScalarValue::UInt64(Some(10)),
3894+
5,
3895+
),
3896+
(
3897+
ScalarValue::Float32(Some(1.0)),
3898+
ScalarValue::Float32(Some(2.0)),
3899+
1,
3900+
),
3901+
(
3902+
ScalarValue::Float32(Some(2.0)),
3903+
ScalarValue::Float32(Some(1.0)),
3904+
1,
3905+
),
3906+
(
3907+
ScalarValue::Float64(Some(0.0)),
3908+
ScalarValue::Float64(Some(0.0)),
3909+
0,
3910+
),
3911+
(
3912+
ScalarValue::Float64(Some(-5.0)),
3913+
ScalarValue::Float64(Some(-10.0)),
3914+
5,
3915+
),
3916+
(
3917+
ScalarValue::Float64(Some(-10.0)),
3918+
ScalarValue::Float64(Some(-5.0)),
3919+
5,
3920+
),
3921+
// Floats are currently special cased to f64/f32 and the result is rounded
3922+
// rather than ceiled/floored. In the future we might want to take a mode
3923+
// which specified the rounding behavior.
3924+
(
3925+
ScalarValue::Float32(Some(1.2)),
3926+
ScalarValue::Float32(Some(1.3)),
3927+
0,
3928+
),
3929+
(
3930+
ScalarValue::Float32(Some(1.1)),
3931+
ScalarValue::Float32(Some(1.9)),
3932+
1,
3933+
),
3934+
(
3935+
ScalarValue::Float64(Some(-5.3)),
3936+
ScalarValue::Float64(Some(-9.2)),
3937+
4,
3938+
),
3939+
(
3940+
ScalarValue::Float64(Some(-5.3)),
3941+
ScalarValue::Float64(Some(-9.7)),
3942+
4,
3943+
),
3944+
(
3945+
ScalarValue::Float64(Some(-5.3)),
3946+
ScalarValue::Float64(Some(-9.9)),
3947+
5,
3948+
),
3949+
];
3950+
for (lhs, rhs, expected) in cases.iter() {
3951+
let distance = lhs.distance(rhs).unwrap();
3952+
assert_eq!(distance, *expected);
3953+
}
3954+
}
3955+
3956+
#[test]
3957+
fn test_scalar_distance_invalid() {
3958+
let cases = [
3959+
// scalar (lhs), scalar (rhs)
3960+
// --------------------------
3961+
// Same type but with nulls
3962+
(ScalarValue::Int8(None), ScalarValue::Int8(None)),
3963+
(ScalarValue::Int8(None), ScalarValue::Int8(Some(1))),
3964+
(ScalarValue::Int8(Some(1)), ScalarValue::Int8(None)),
3965+
// Different type
3966+
(ScalarValue::Int8(Some(1)), ScalarValue::Int16(Some(1))),
3967+
(ScalarValue::Int8(Some(1)), ScalarValue::Float32(Some(1.0))),
3968+
(
3969+
ScalarValue::Float64(Some(1.1)),
3970+
ScalarValue::Float32(Some(2.2)),
3971+
),
3972+
(
3973+
ScalarValue::UInt64(Some(777)),
3974+
ScalarValue::Int32(Some(111)),
3975+
),
3976+
// Different types with nulls
3977+
(ScalarValue::Int8(None), ScalarValue::Int16(Some(1))),
3978+
(ScalarValue::Int8(Some(1)), ScalarValue::Int16(None)),
3979+
// Unsupported types
3980+
(
3981+
ScalarValue::Utf8(Some("foo".to_string())),
3982+
ScalarValue::Utf8(Some("bar".to_string())),
3983+
),
3984+
(
3985+
ScalarValue::Boolean(Some(true)),
3986+
ScalarValue::Boolean(Some(false)),
3987+
),
3988+
(ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(1))),
3989+
(ScalarValue::Date64(Some(0)), ScalarValue::Date64(Some(1))),
3990+
(
3991+
ScalarValue::Decimal128(Some(123), 5, 5),
3992+
ScalarValue::Decimal128(Some(120), 5, 5),
3993+
),
3994+
];
3995+
for (lhs, rhs) in cases {
3996+
let distance = lhs.distance(&rhs);
3997+
assert!(distance.is_none());
3998+
}
3999+
}
38134000
}

datafusion/common/src/stats.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This module provides an interface for plan level statistics.
19+
20+
use crate::ScalarValue;
21+
22+
/// Statistics for a physical plan node
23+
/// Fields are optional and can be inexact because the sources
24+
/// sometimes provide approximate estimates for performance reasons
25+
/// and the transformations output are not always predictable.
26+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
27+
pub struct Statistics {
28+
/// The number of table rows
29+
pub num_rows: Option<usize>,
30+
/// total bytes of the table rows
31+
pub total_byte_size: Option<usize>,
32+
/// Statistics on a column level
33+
pub column_statistics: Option<Vec<ColumnStatistics>>,
34+
/// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not
35+
/// an estimate). Any or all other fields might still be None, in which case no information is known.
36+
/// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value.
37+
pub is_exact: bool,
38+
}
39+
40+
/// This table statistics are estimates about column
41+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
42+
pub struct ColumnStatistics {
43+
/// Number of null values on column
44+
pub null_count: Option<usize>,
45+
/// Maximum value of column
46+
pub max_value: Option<ScalarValue>,
47+
/// Minimum value of column
48+
pub min_value: Option<ScalarValue>,
49+
/// Number of distinct values
50+
pub distinct_count: Option<usize>,
51+
}

datafusion/core/src/physical_plan/mod.rs

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ use self::metrics::MetricsSet;
2222
use self::{
2323
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
2424
};
25+
pub use crate::common::{ColumnStatistics, Statistics};
26+
use crate::error::Result;
2527
use crate::physical_plan::expressions::PhysicalSortExpr;
26-
use crate::{error::Result, scalar::ScalarValue};
2728

2829
use arrow::datatypes::SchemaRef;
2930
use arrow::error::Result as ArrowResult;
@@ -88,36 +89,6 @@ impl Stream for EmptyRecordBatchStream {
8889
/// Physical planner interface
8990
pub use self::planner::PhysicalPlanner;
9091

91-
/// Statistics for a physical plan node
92-
/// Fields are optional and can be inexact because the sources
93-
/// sometimes provide approximate estimates for performance reasons
94-
/// and the transformations output are not always predictable.
95-
#[derive(Debug, Clone, Default, PartialEq, Eq)]
96-
pub struct Statistics {
97-
/// The number of table rows
98-
pub num_rows: Option<usize>,
99-
/// total bytes of the table rows
100-
pub total_byte_size: Option<usize>,
101-
/// Statistics on a column level
102-
pub column_statistics: Option<Vec<ColumnStatistics>>,
103-
/// If true, any field that is `Some(..)` is the actual value in the data provided by the operator (it is not
104-
/// an estimate). Any or all other fields might still be None, in which case no information is known.
105-
/// if false, any field that is `Some(..)` may contain an inexact estimate and may not be the actual value.
106-
pub is_exact: bool,
107-
}
108-
/// This table statistics are estimates about column
109-
#[derive(Clone, Debug, Default, PartialEq, Eq)]
110-
pub struct ColumnStatistics {
111-
/// Number of null values on column
112-
pub null_count: Option<usize>,
113-
/// Maximum value of column
114-
pub max_value: Option<ScalarValue>,
115-
/// Minimum value of column
116-
pub min_value: Option<ScalarValue>,
117-
/// Number of distinct values
118-
pub distinct_count: Option<usize>,
119-
}
120-
12192
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
12293
///
12394
/// Each `ExecutionPlan` is partition-aware and is responsible for

datafusion/expr/src/operator.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,41 @@ impl Operator {
115115
| Operator::StringConcat => None,
116116
}
117117
}
118+
119+
/// Return the operator where swapping lhs and rhs wouldn't change the result.
120+
///
121+
/// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`.
122+
pub fn swap(&self) -> Option<Operator> {
123+
match self {
124+
Operator::Lt => Some(Operator::Gt),
125+
Operator::LtEq => Some(Operator::GtEq),
126+
Operator::Gt => Some(Operator::Lt),
127+
Operator::GtEq => Some(Operator::LtEq),
128+
Operator::Eq
129+
| Operator::NotEq
130+
| Operator::Like
131+
| Operator::NotLike
132+
| Operator::IsDistinctFrom
133+
| Operator::IsNotDistinctFrom
134+
| Operator::Plus
135+
| Operator::Minus
136+
| Operator::Multiply
137+
| Operator::Divide
138+
| Operator::Modulo
139+
| Operator::And
140+
| Operator::Or
141+
| Operator::RegexMatch
142+
| Operator::RegexIMatch
143+
| Operator::RegexNotMatch
144+
| Operator::RegexNotIMatch
145+
| Operator::BitwiseAnd
146+
| Operator::BitwiseOr
147+
| Operator::BitwiseXor
148+
| Operator::BitwiseShiftRight
149+
| Operator::BitwiseShiftLeft
150+
| Operator::StringConcat => None,
151+
}
152+
}
118153
}
119154

120155
impl fmt::Display for Operator {

0 commit comments

Comments
 (0)