Skip to content

Support bounds evaluation for temporal data types #14523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a5f5729
Support temporal data types in interval arithmetics
ch-sc Dec 19, 2024
68784cc
fix test code
ch-sc Feb 6, 2025
bc9eb4d
cargo fmt
ch-sc Feb 6, 2025
5960170
clean up
ch-sc Feb 6, 2025
fb1d681
fix interval type coercion
ch-sc Feb 6, 2025
4ea39eb
remove NotEq from supported operators
ch-sc Feb 11, 2025
b873f50
Merge branch 'main' of github.com:apache/arrow-datafusion into suppor…
ch-sc Feb 12, 2025
b1b8aec
fix imports
ch-sc Feb 12, 2025
cb11459
cargo fmt
ch-sc Feb 12, 2025
b4bd851
revisit union interval logic
ch-sc Feb 13, 2025
aaf116b
revisit union interval logic
ch-sc Feb 14, 2025
5d65ba1
treat null as unbounded
ch-sc Feb 18, 2025
4642bfa
clean up
ch-sc Feb 18, 2025
12f7b3c
csv source yields too many column stats
ch-sc Feb 19, 2025
cbeb26c
clippy
ch-sc Feb 19, 2025
ec9f8be
csv source yields too many column stats
ch-sc Feb 19, 2025
621809f
Merge branch 'main' of github.com:apache/arrow-datafusion into suppor…
ch-sc Feb 19, 2025
d14747d
addressing comments
ch-sc Feb 24, 2025
d3810de
omit clone
ch-sc Feb 25, 2025
0147f66
Merge branch 'main' of github.com:apache/arrow-datafusion into suppor…
ch-sc Feb 25, 2025
8813adb
remove check
ch-sc Feb 25, 2025
f521a6a
UDF evaluate bounds default impl
ch-sc Feb 25, 2025
28803d6
check support before evaluating bounds
ch-sc Mar 26, 2025
4d81d6c
fmt
ch-sc Mar 26, 2025
777d58e
Merge branch 'main' of github.com:apache/arrow-datafusion into suppor…
ch-sc Mar 26, 2025
d1994cc
fix after merge
ch-sc Mar 26, 2025
318a35d
fix doc test
ch-sc Mar 26, 2025
665d0c6
fix doc test
ch-sc Mar 26, 2025
a63dd05
fix example code
ch-sc Mar 27, 2025
0d73839
clippy
ch-sc Mar 27, 2025
c232884
remove println
ch-sc Mar 27, 2025
6f7fe72
move boolean_negate & rename arithmetic_negate
ch-sc Apr 9, 2025
431c327
Merge branch 'main' of github.com:apache/arrow-datafusion into suppor…
ch-sc Apr 9, 2025
ae257d1
fix negate test
ch-sc Apr 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 29 additions & 7 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::stats::Precision;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{ColumnStatistics, DFSchema};
use datafusion::common::{internal_datafusion_err, ColumnStatistics, DFSchema};
use datafusion::common::{ScalarValue, ToDFSchema};
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::first_value_udaf;
Expand Down Expand Up @@ -264,6 +264,8 @@ fn range_analysis_demo() -> Result<()> {
// Now, we invoke the analysis code to perform the range analysis
let df_schema = DFSchema::try_from(schema)?;
let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?;

assert!(physical_expr.supports_bounds_evaluation(df_schema.as_ref()));
let analysis_result = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
Expand Down Expand Up @@ -305,10 +307,17 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> {
distinct_count: Precision::Absent,
};

let field = schema.fields().first().ok_or_else(|| {
internal_datafusion_err!("schema does not have a field at index 0")
})?;

// We can then build our expression boundaries from the column statistics
// allowing the analysis to be more precise.
let initial_boundaries =
vec![ExprBoundaries::try_from_column(&schema, &column_stats, 0)?];
let initial_boundaries = vec![ExprBoundaries::try_from_column(
field.as_ref(),
&column_stats,
0,
)?];

// With the above we can perform the boundary analysis similar to the previous
// example.
Expand All @@ -317,6 +326,8 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> {
// Analysis case id >= 5000
let physical_expr =
SessionContext::new().create_physical_expr(id_greater_5000, &df_schema)?;
assert!(physical_expr.supports_bounds_evaluation(df_schema.as_ref()));

let analysis = analyze(
&physical_expr,
AnalysisContext::new(initial_boundaries.clone()),
Expand Down Expand Up @@ -359,7 +370,8 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
let age_between_18_25 = col("age").gt(lit(18i64)).and(col("age").lt_eq(lit(25)));

// As always we need to tell DataFusion the type of the column.
let schema = Arc::new(Schema::new(vec![make_field("age", DataType::Int64)]));
let age_field = make_field("age", DataType::Int64);
let schema = Arc::new(Schema::new(vec![age_field.clone()]));

// Similarly to the example in `boundary_analysis_and_selectivity_demo` we
// can establish column statistics that can be used to describe certain
Expand All @@ -372,8 +384,11 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
distinct_count: Precision::Absent,
};

let initial_boundaries =
vec![ExprBoundaries::try_from_column(&schema, &column_stats, 0)?];
let initial_boundaries = vec![ExprBoundaries::try_from_column(
&age_field,
&column_stats,
0,
)?];

// Before we run the analysis pass; let us describe what we can infer from
// the initial information.
Expand All @@ -394,6 +409,9 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {

let physical_expr =
SessionContext::new().create_physical_expr(age_between_18_25, &df_schema)?;

assert!(physical_expr.supports_bounds_evaluation(df_schema.as_ref()));

let analysis = analyze(
&physical_expr,
// We re-use initial_boundaries elsewhere so we must clone it.
Expand Down Expand Up @@ -446,7 +464,11 @@ fn boundary_analysis_in_conjuctions_demo() -> Result<()> {
let physical_expr = SessionContext::new()
.create_physical_expr(age_greater_than_60_less_than_18, &df_schema)?;

// Since we don't handle interval arithmetic for `OR` operator this will error out.
// This check will return false since we don't handle interval arithmetic
// for `OR` operator.
assert!(!physical_expr.supports_bounds_evaluation(df_schema.inner()));

// In consequence, this will error out.
let analysis = analyze(
&physical_expr,
AnalysisContext::new(initial_boundaries),
Expand Down
26 changes: 15 additions & 11 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ impl ScalarValue {
}

/// Calculate arithmetic negation for a scalar value
pub fn arithmetic_negate(&self) -> Result<Self> {
pub fn negate(&self) -> Result<Self> {
fn neg_checked_with_ctx<T: ArrowNativeTypeOp>(
v: T,
ctx: impl Fn() -> String,
Expand Down Expand Up @@ -1578,6 +1578,8 @@ impl ScalarValue {
tz.clone(),
))
}
ScalarValue::Boolean(None) => Ok(self.clone()),
ScalarValue::Boolean(Some(value)) => Ok(ScalarValue::Boolean(Some(!value))),
value => _internal_err!(
"Can not run arithmetic negative on scalar value {value:?}"
),
Expand Down Expand Up @@ -4598,7 +4600,7 @@ mod tests {
let try_into_value: i128 = decimal_value.clone().try_into().unwrap();
assert_eq!(123_i128, try_into_value);
assert!(!decimal_value.is_null());
let neg_decimal_value = decimal_value.arithmetic_negate()?;
let neg_decimal_value = decimal_value.negate()?;
match neg_decimal_value {
ScalarValue::Decimal128(v, _, _) => {
assert_eq!(-123, v.unwrap());
Expand Down Expand Up @@ -6334,15 +6336,17 @@ mod tests {
fn test_scalar_negative() -> Result<()> {
// positive test
let value = ScalarValue::Int32(Some(12));
assert_eq!(ScalarValue::Int32(Some(-12)), value.arithmetic_negate()?);
assert_eq!(ScalarValue::Int32(Some(-12)), value.negate()?);
let value = ScalarValue::Int32(None);
assert_eq!(ScalarValue::Int32(None), value.arithmetic_negate()?);
assert_eq!(ScalarValue::Int32(None), value.negate()?);
let value = ScalarValue::Boolean(Some(false));
assert_eq!(ScalarValue::Boolean(Some(true)), value.negate()?);

// negative test
let value = ScalarValue::UInt8(Some(12));
assert!(value.arithmetic_negate().is_err());
let value = ScalarValue::Boolean(None);
assert!(value.arithmetic_negate().is_err());
assert!(value.negate().is_err());
let value = ScalarValue::Binary(None);
assert!(value.negate().is_err());
Ok(())
}

Expand All @@ -6353,7 +6357,7 @@ mod tests {
($($val:expr),* $(,)?) => {$(
{
let value: ScalarValue = $val;
let err = value.arithmetic_negate().expect_err("Should receive overflow error on negating {value:?}");
let err = value.negate().expect_err("Should receive overflow error on negating {value:?}");
let root_err = err.find_root();
match root_err{
DataFusionError::ArrowError(
Expand Down Expand Up @@ -6404,7 +6408,7 @@ mod tests {
];
// skip float 16 because they aren't supported
for (test, expected) in float_cases.into_iter().skip(2) {
assert_eq!(test.arithmetic_negate()?, expected);
assert_eq!(test.negate()?, expected);
}
Ok(())
}
Expand All @@ -6424,7 +6428,7 @@ mod tests {
];

for (test, expected) in cases {
assert_eq!(test.arithmetic_negate().unwrap(), expected);
assert_eq!(test.negate().unwrap(), expected);
}
}

Expand Down Expand Up @@ -6721,7 +6725,7 @@ mod tests {
),
];
for (expr, expected) in cases.iter() {
let result = expr.arithmetic_negate().unwrap();
let result = expr.negate().unwrap();
assert_eq!(*expected, result, "-expr:{expr:?}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Precision<usize> {
/// rows are selected. A selectivity of `0.5` means half the rows are
/// selected. Will always return inexact statistics.
pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
self.map(|v| (v as f64 * selectivity).ceil() as usize)
.to_inexact()
}
}
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskCon
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_optimizer::join_selection::{
Expand Down Expand Up @@ -1080,21 +1079,21 @@ fn check_expr_supported() {
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr, &schema));
assert!(&supported_expr.supports_bounds_evaluation(&schema));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr_2, &schema));
assert!(&supported_expr_2.supports_bounds_evaluation(&schema));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr, &schema));
assert!(!&unsupported_expr.supports_bounds_evaluation(&schema));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr_2, &schema));
assert!(!&unsupported_expr_2.supports_bounds_evaluation(&schema));
}

struct TestCase {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ datafusion-common = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
paste = "^1.0"

[dev-dependencies]
arrow-buffer = { workspace = true }
Loading