Skip to content

Add short circuit evaluation for AND and OR #15462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3e2a299
[draft] add shot circuit in BinaryExpr
acking-you Jul 3, 2024
57d6645
refactor: add check_short_circuit function
acking-you Jul 3, 2024
b85807d
refactor: change if condition to match
acking-you Jul 4, 2024
bf4e218
feat: Add support for --mem-pool-type and --memory-limit options to m…
Kontinuation Feb 14, 2025
c41465e
Chore/Add additional FFI unit tests (#14802)
timsaucer Feb 21, 2025
f171227
Improve feature flag CI coverage `datafusion` and `datafusion-functio…
alamb Mar 17, 2025
ac3c918
add extend sql & docs
acking-you Mar 27, 2025
a642394
feat: Add support for --mem-pool-type and --memory-limit options to m…
Kontinuation Feb 14, 2025
39855d3
Chore/Add additional FFI unit tests (#14802)
timsaucer Feb 21, 2025
2205edb
Improve feature flag CI coverage `datafusion` and `datafusion-functio…
alamb Mar 17, 2025
d79a75a
fix: incorrect false judgment
acking-you Mar 28, 2025
575c3f3
add test
acking-you Mar 29, 2025
8801063
separate q6 to new PR
acking-you Mar 31, 2025
ad1210e
Merge branch 'apache:main' into add_short_circuit
acking-you Apr 7, 2025
e190119
feat: Add support for --mem-pool-type and --memory-limit options to m…
Kontinuation Feb 14, 2025
f2c4caa
Chore/Add additional FFI unit tests (#14802)
timsaucer Feb 21, 2025
0ef29b1
Improve feature flag CI coverage `datafusion` and `datafusion-functio…
alamb Mar 17, 2025
6ea2502
feat: Add support for --mem-pool-type and --memory-limit options to m…
Kontinuation Feb 14, 2025
f8f4d6f
Chore/Add additional FFI unit tests (#14802)
timsaucer Feb 21, 2025
13742d2
Improve feature flag CI coverage `datafusion` and `datafusion-functio…
alamb Mar 17, 2025
1394f3f
add benchmark for boolean_op
acking-you Apr 7, 2025
59cfced
fix cargo doc
acking-you Apr 7, 2025
775e70a
add binary_op bench
acking-you Apr 7, 2025
11816e9
Better comments
acking-you Apr 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ name = "case_when"
[[bench]]
harness = false
name = "is_null"

[[bench]]
harness = false
name = "binary_op"
373 changes: 373 additions & 0 deletions datafusion/physical-expr/benches/binary_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::{
array::BooleanArray,
compute::{bool_and, bool_or},
datatypes::{DataType, Field, Schema},
};
use arrow::{array::StringArray, record_batch::RecordBatch};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::{and, binary_expr, col, lit, or, Operator};
use datafusion_physical_expr::{
expressions::{BinaryExpr, Column},
planner::logical2physical,
PhysicalExpr,
};
use std::sync::{Arc, LazyLock};

/// Generates BooleanArrays with different true/false distributions for benchmarking.
///
/// Returns a vector of tuples containing scenario name and corresponding BooleanArray.
///
/// # Arguments
/// - `TEST_ALL_FALSE` - Used to generate what kind of test data
/// - `len` - Length of the BooleanArray to generate
fn generate_boolean_cases<const TEST_ALL_FALSE: bool>(
len: usize,
) -> Vec<(String, BooleanArray)> {
let mut cases = Vec::with_capacity(6);

// Scenario 1: All elements false or all elements true
if TEST_ALL_FALSE {
let all_false = BooleanArray::from(vec![false; len]);
cases.push(("all_false".to_string(), all_false));
} else {
let all_true = BooleanArray::from(vec![true; len]);
cases.push(("all_true".to_string(), all_true));
}

// Scenario 2: Single true at first position or single false at first position
if TEST_ALL_FALSE {
let mut first_true = vec![false; len];
first_true[0] = true;
cases.push(("one_true_first".to_string(), BooleanArray::from(first_true)));
} else {
let mut first_false = vec![true; len];
first_false[0] = false;
cases.push((
"one_false_first".to_string(),
BooleanArray::from(first_false),
));
}

// Scenario 3: Single true at last position or single false at last position
if TEST_ALL_FALSE {
let mut last_true = vec![false; len];
last_true[len - 1] = true;
cases.push(("one_true_last".to_string(), BooleanArray::from(last_true)));
} else {
let mut last_false = vec![true; len];
last_false[len - 1] = false;
cases.push(("one_false_last".to_string(), BooleanArray::from(last_false)));
}

// Scenario 4: Single true at exact middle or single false at exact middle
let mid = len / 2;
if TEST_ALL_FALSE {
let mut mid_true = vec![false; len];
mid_true[mid] = true;
cases.push(("one_true_middle".to_string(), BooleanArray::from(mid_true)));
} else {
let mut mid_false = vec![true; len];
mid_false[mid] = false;
cases.push((
"one_false_middle".to_string(),
BooleanArray::from(mid_false),
));
}

// Scenario 5: Single true at 25% position or single false at 25% position
let mid_left = len / 4;
if TEST_ALL_FALSE {
let mut mid_left_true = vec![false; len];
mid_left_true[mid_left] = true;
cases.push((
"one_true_middle_left".to_string(),
BooleanArray::from(mid_left_true),
));
} else {
let mut mid_left_false = vec![true; len];
mid_left_false[mid_left] = false;
cases.push((
"one_false_middle_left".to_string(),
BooleanArray::from(mid_left_false),
));
}

// Scenario 6: Single true at 75% position or single false at 75% position
let mid_right = (3 * len) / 4;
if TEST_ALL_FALSE {
let mut mid_right_true = vec![false; len];
mid_right_true[mid_right] = true;
cases.push((
"one_true_middle_right".to_string(),
BooleanArray::from(mid_right_true),
));
} else {
let mut mid_right_false = vec![true; len];
mid_right_false[mid_right] = false;
cases.push((
"one_false_middle_right".to_string(),
BooleanArray::from(mid_right_false),
));
}

cases
}

/// Benchmarks boolean operations `false_count/bool_or` and `true_count/bool_and` on [`BooleanArray`]
/// You can run this benchmark with:
/// ```sh
/// # test true_count/false_count
/// TEST_BOOL_COUNT=1 cargo bench --bench binary_op -- boolean_ops
/// # test bool_or/bool_and
/// cargo bench --bench binary_op -- boolean_ops
/// ```
fn benchmark_boolean_ops(c: &mut Criterion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since these are basically arrow benchmarks, I made a small follow on PR to propose removing them for now:

let len = 1_000_000; // Use one million elements for clear performance differentiation
static TEST_BOOL_COUNT: LazyLock<bool> =
LazyLock::new(|| match std::env::var("TEST_BOOL_COUNT") {
Ok(_) => {
println!("TEST_BOOL_COUNT=ON");
true
}
Err(_) => {
println!("TEST_BOOL_COUNT=OFF");
false
}
});

// Determine the test function to be executed based on the ENV `TEST_BOOL_COUNT`
fn test_func<const TEST_ALL_FALSE: bool>(array: &BooleanArray) -> bool {
// Use false_count for all false and true_count for all true
if *TEST_BOOL_COUNT {
if TEST_ALL_FALSE {
array.false_count() == array.len()
} else {
array.true_count() == array.len()
}
}
// Use bool_or for all false and bool_and for all true
else if TEST_ALL_FALSE {
match bool_or(array) {
Some(v) => !v,
None => false,
}
} else {
bool_and(array).unwrap_or(false)
}
}

// Test cases for false_count and bool_or
{
let test_cases = generate_boolean_cases::<true>(len);
for (scenario, array) in test_cases {
let arr_ref = Arc::new(array);

// Benchmark test_func across different scenarios
c.bench_function(&format!("boolean_ops/or/{}", scenario), |b| {
b.iter(|| test_func::<true>(black_box(&arr_ref)))
});
}
}
// Test cases for true_count and bool_and
{
let test_cases = generate_boolean_cases::<false>(len);
for (scenario, array) in test_cases {
let arr_ref = Arc::new(array);

// Benchmark test_func across different scenarios
c.bench_function(&format!("boolean_ops/and/{}", scenario), |b| {
b.iter(|| test_func::<false>(black_box(&arr_ref)))
});
}
}
}

/// Benchmarks AND/OR operator short-circuiting by evaluating complex regex conditions.
///
/// Creates 6 test scenarios per operator:
/// 1. All values enable short-circuit (all_true/all_false)
/// 2. 2-6 Single true/false value at different positions to measure early exit
///
/// You can run this benchmark with:
/// ```sh
/// cargo bench --bench binary_op -- short_circuit
/// ```
fn benchmark_binary_op_in_short_circuit(c: &mut Criterion) {
// Create schema with three columns
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Boolean, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Utf8, false),
]));

// Generate test data with extended content
let (b_values, c_values) = generate_test_strings(8192);

let batches_and =
create_record_batch::<true>(schema.clone(), &b_values, &c_values).unwrap();
let batches_or =
create_record_batch::<false>(schema.clone(), &b_values, &c_values).unwrap();

// Build complex string matching conditions
let right_condition_and = and(
// Check for API endpoint pattern in URLs
binary_expr(
col("b"),
Operator::RegexMatch,
lit(r#"^https://(\w+\.)?example\.(com|org)/"#),
),
// Check for markdown code blocks and summary section
binary_expr(
col("c"),
Operator::RegexMatch,
lit("```(rust|python|go)\nfn? main$$"),
),
);

let right_condition_or = or(
// Check for secure HTTPS protocol
binary_expr(
col("b"),
Operator::RegexMatch,
lit(r#"^https://(\w+\.)?example\.(com|org)/"#),
),
// Check for Rust code examples
binary_expr(
col("c"),
Operator::RegexMatch,
lit("```(rust|python|go)\nfn? main$$"),
),
);

// Create physical binary expressions
let expr_and = BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::And,
logical2physical(&right_condition_and, &schema),
);

let expr_or = BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
logical2physical(&right_condition_or, &schema),
);

// Each scenario when the test operator is `and`
{
for (name, batch) in batches_and {
c.bench_function(&format!("short_circuit/and/{}", name), |b| {
b.iter(|| expr_and.evaluate(black_box(&batch)).unwrap())
});
}
}
// Each scenario when the test operator is `or`
{
for (name, batch) in batches_or {
c.bench_function(&format!("short_circuit/or/{}", name), |b| {
b.iter(|| expr_or.evaluate(black_box(&batch)).unwrap())
});
}
}
}

/// Generate test data with computationally expensive patterns
fn generate_test_strings(num_rows: usize) -> (Vec<String>, Vec<String>) {
// Extended URL patterns with query parameters and paths
let base_urls = [
"https://api.example.com/v2/users/12345/posts?category=tech&sort=date&lang=en-US",
"https://cdn.example.net/assets/images/2023/08/15/sample-image-highres.jpg?width=1920&quality=85",
"http://service.demo.org:8080/api/data/transactions/20230815123456.csv",
"ftp://legacy.archive.example/backups/2023/Q3/database-dump.sql.gz",
"https://docs.example.co.uk/reference/advanced-topics/concurrency/parallel-processing.md#implementation-details",
];

// Extended markdown content with code blocks and structure
let base_markdowns = [
concat!(
"# Advanced Topics in Computer Science\n\n",
"## Summary\nThis article explores complex system design patterns and...\n\n",
"```rust\nfn process_data(data: &mut [i32]) {\n // Parallel processing example\n data.par_iter_mut().for_each(|x| *x *= 2);\n}\n```\n\n",
"## Performance Considerations\nWhen implementing concurrent systems...\n"
),
concat!(
"## API Documentation\n\n",
"```json\n{\n \"endpoint\": \"/api/v2/users\",\n \"methods\": [\"GET\", \"POST\"],\n \"parameters\": {\n \"page\": \"number\"\n }\n}\n```\n\n",
"# Authentication Guide\nSecure your API access using OAuth 2.0...\n"
),
concat!(
"# Data Processing Pipeline\n\n",
"```python\nfrom multiprocessing import Pool\n\ndef main():\n with Pool(8) as p:\n results = p.map(process_item, data)\n```\n\n",
"## Summary of Optimizations\n1. Batch processing\n2. Memory pooling\n3. Concurrent I/O operations\n"
),
concat!(
"# System Architecture Overview\n\n",
"## Components\n- Load Balancer\n- Database Cluster\n- Cache Service\n\n",
"```go\nfunc main() {\n router := gin.Default()\n router.GET(\"/api/health\", healthCheck)\n router.Run(\":8080\")\n}\n```\n"
),
concat!(
"## Configuration Reference\n\n",
"```yaml\nserver:\n port: 8080\n max_threads: 32\n\ndatabase:\n url: postgres://user@prod-db:5432/main\n```\n\n",
"# Deployment Strategies\nBlue-green deployment patterns with...\n"
),
];

let mut urls = Vec::with_capacity(num_rows);
let mut markdowns = Vec::with_capacity(num_rows);

for i in 0..num_rows {
urls.push(base_urls[i % 5].to_string());
markdowns.push(base_markdowns[i % 5].to_string());
}

(urls, markdowns)
}

/// Creates record batches with boolean arrays that test different short-circuit scenarios.
/// When TEST_ALL_FALSE = true: creates data for AND operator benchmarks (needs early false exit)
/// When TEST_ALL_FALSE = false: creates data for OR operator benchmarks (needs early true exit)
fn create_record_batch<const TEST_ALL_FALSE: bool>(
schema: Arc<Schema>,
b_values: &[String],
c_values: &[String],
) -> arrow::error::Result<Vec<(String, RecordBatch)>> {
// Generate data for six scenarios, but only the data for the "all_false" and "all_true" cases can be optimized through short-circuiting
let boolean_array = generate_boolean_cases::<TEST_ALL_FALSE>(b_values.len());
let mut rbs = Vec::with_capacity(boolean_array.len());
for (name, a_array) in boolean_array {
let b_array = StringArray::from(b_values.to_vec());
let c_array = StringArray::from(c_values.to_vec());
rbs.push((
name,
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(a_array), Arc::new(b_array), Arc::new(c_array)],
)?,
));
}
Ok(rbs)
}

criterion_group!(
benches,
benchmark_boolean_ops,
benchmark_binary_op_in_short_circuit
);

criterion_main!(benches);
Loading
Loading