Skip to content

Commit 29079c8

Browse files
committed
Merge branch 'main' into physicalexpr-cse
# Conflicts: # datafusion/common/src/cse.rs # datafusion/optimizer/src/common_subexpr_eliminate.rs
2 parents 9361ee6 + 45a316c commit 29079c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+658
-394
lines changed

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,32 @@ fn swap_join_projection(
140140
left_schema_len: usize,
141141
right_schema_len: usize,
142142
projection: Option<&Vec<usize>>,
143+
join_type: &JoinType,
143144
) -> Option<Vec<usize>> {
144-
projection.map(|p| {
145-
p.iter()
146-
.map(|i| {
147-
// If the index is less than the left schema length, it is from the left schema, so we add the right schema length to it.
148-
// Otherwise, it is from the right schema, so we subtract the left schema length from it.
149-
if *i < left_schema_len {
150-
*i + right_schema_len
151-
} else {
152-
*i - left_schema_len
153-
}
154-
})
155-
.collect()
156-
})
145+
match join_type {
146+
// For Anti/Semi join types, projection should remain unmodified,
147+
// since these joins output schema remains the same after swap
148+
JoinType::LeftAnti
149+
| JoinType::LeftSemi
150+
| JoinType::RightAnti
151+
| JoinType::RightSemi => projection.cloned(),
152+
153+
_ => projection.map(|p| {
154+
p.iter()
155+
.map(|i| {
156+
// If the index is less than the left schema length, it is from
157+
// the left schema, so we add the right schema length to it.
158+
// Otherwise, it is from the right schema, so we subtract the left
159+
// schema length from it.
160+
if *i < left_schema_len {
161+
*i + right_schema_len
162+
} else {
163+
*i - left_schema_len
164+
}
165+
})
166+
.collect()
167+
}),
168+
}
157169
}
158170

159171
/// This function swaps the inputs of the given join operator.
@@ -179,6 +191,7 @@ pub fn swap_hash_join(
179191
left.schema().fields().len(),
180192
right.schema().fields().len(),
181193
hash_join.projection.as_ref(),
194+
hash_join.join_type(),
182195
),
183196
partition_mode,
184197
hash_join.null_equals_null(),
@@ -1289,27 +1302,59 @@ mod tests_statistical {
12891302
);
12901303
}
12911304

1305+
#[rstest(
1306+
join_type, projection, small_on_right,
1307+
case::inner(JoinType::Inner, vec![1], true),
1308+
case::left(JoinType::Left, vec![1], true),
1309+
case::right(JoinType::Right, vec![1], true),
1310+
case::full(JoinType::Full, vec![1], true),
1311+
case::left_anti(JoinType::LeftAnti, vec![0], false),
1312+
case::left_semi(JoinType::LeftSemi, vec![0], false),
1313+
case::right_anti(JoinType::RightAnti, vec![0], true),
1314+
case::right_semi(JoinType::RightSemi, vec![0], true),
1315+
)]
12921316
#[tokio::test]
1293-
async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
1317+
async fn test_hash_join_swap_on_joins_with_projections(
1318+
join_type: JoinType,
1319+
projection: Vec<usize>,
1320+
small_on_right: bool,
1321+
) -> Result<()> {
12941322
let (big, small) = create_big_and_small();
1323+
1324+
let left = if small_on_right { &big } else { &small };
1325+
let right = if small_on_right { &small } else { &big };
1326+
1327+
let left_on = if small_on_right {
1328+
"big_col"
1329+
} else {
1330+
"small_col"
1331+
};
1332+
let right_on = if small_on_right {
1333+
"small_col"
1334+
} else {
1335+
"big_col"
1336+
};
1337+
12951338
let join = Arc::new(HashJoinExec::try_new(
1296-
Arc::clone(&big),
1297-
Arc::clone(&small),
1339+
Arc::clone(left),
1340+
Arc::clone(right),
12981341
vec![(
1299-
Arc::new(Column::new_with_schema("big_col", &big.schema())?),
1300-
Arc::new(Column::new_with_schema("small_col", &small.schema())?),
1342+
Arc::new(Column::new_with_schema(left_on, &left.schema())?),
1343+
Arc::new(Column::new_with_schema(right_on, &right.schema())?),
13011344
)],
13021345
None,
1303-
&JoinType::Inner,
1304-
Some(vec![1]),
1346+
&join_type,
1347+
Some(projection),
13051348
PartitionMode::Partitioned,
13061349
false,
13071350
)?);
1351+
13081352
let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
13091353
.expect("swap_hash_join must support joins with projections");
13101354
let swapped_join = swapped.as_any().downcast_ref::<HashJoinExec>().expect(
13111355
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
13121356
);
1357+
13131358
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
13141359
assert_eq!(swapped.schema().fields.len(), 1);
13151360
assert_eq!(swapped.schema().fields[0].name(), "small_col");

datafusion/core/tests/fuzz_cases/equivalence/ordering.rs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
// under the License.
1717

1818
use crate::fuzz_cases::equivalence::utils::{
19-
create_random_schema, generate_table_for_eq_properties, is_table_same_after_sort,
20-
TestScalarUDF,
19+
convert_to_orderings, create_random_schema, create_test_schema_2,
20+
generate_table_for_eq_properties, generate_table_for_orderings,
21+
is_table_same_after_sort, TestScalarUDF,
2122
};
2223
use arrow_schema::SortOptions;
2324
use datafusion_common::{DFSchema, Result};
@@ -158,3 +159,66 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
158159

159160
Ok(())
160161
}
162+
163+
// This test checks given a table is ordered with `[a ASC, b ASC, c ASC, d ASC]` and `[a ASC, c ASC, b ASC, d ASC]`
164+
// whether the table is also ordered with `[a ASC, b ASC, d ASC]` and `[a ASC, c ASC, d ASC]`
165+
// Since these orderings cannot be deduced, these orderings shouldn't be satisfied by the table generated.
166+
// For background see discussion: https://github.com/apache/datafusion/issues/12700#issuecomment-2411134296
167+
#[test]
168+
fn test_ordering_satisfy_on_data() -> Result<()> {
169+
let schema = create_test_schema_2()?;
170+
let col_a = &col("a", &schema)?;
171+
let col_b = &col("b", &schema)?;
172+
let col_c = &col("c", &schema)?;
173+
let col_d = &col("d", &schema)?;
174+
175+
let option_asc = SortOptions {
176+
descending: false,
177+
nulls_first: false,
178+
};
179+
180+
let orderings = vec![
181+
// [a ASC, b ASC, c ASC, d ASC]
182+
vec![
183+
(col_a, option_asc),
184+
(col_b, option_asc),
185+
(col_c, option_asc),
186+
(col_d, option_asc),
187+
],
188+
// [a ASC, c ASC, b ASC, d ASC]
189+
vec![
190+
(col_a, option_asc),
191+
(col_c, option_asc),
192+
(col_b, option_asc),
193+
(col_d, option_asc),
194+
],
195+
];
196+
let orderings = convert_to_orderings(&orderings);
197+
198+
let batch = generate_table_for_orderings(orderings, schema, 1000, 10)?;
199+
200+
// [a ASC, c ASC, d ASC] cannot be deduced
201+
let ordering = vec![
202+
(col_a, option_asc),
203+
(col_c, option_asc),
204+
(col_d, option_asc),
205+
];
206+
let ordering = convert_to_orderings(&[ordering])[0].clone();
207+
assert!(!is_table_same_after_sort(ordering, batch.clone())?);
208+
209+
// [a ASC, b ASC, d ASC] cannot be deduced
210+
let ordering = vec![
211+
(col_a, option_asc),
212+
(col_b, option_asc),
213+
(col_d, option_asc),
214+
];
215+
let ordering = convert_to_orderings(&[ordering])[0].clone();
216+
assert!(!is_table_same_after_sort(ordering, batch.clone())?);
217+
218+
// [a ASC, b ASC] can be deduced
219+
let ordering = vec![(col_a, option_asc), (col_b, option_asc)];
220+
let ordering = convert_to_orderings(&[ordering])[0].clone();
221+
assert!(is_table_same_after_sort(ordering, batch.clone())?);
222+
223+
Ok(())
224+
}

datafusion/core/tests/fuzz_cases/equivalence/utils.rs

Lines changed: 118 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,29 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717
//
18-
// use datafusion_physical_expr::expressions::{col, Column};
1918
use datafusion::physical_plan::expressions::col;
2019
use datafusion::physical_plan::expressions::Column;
2120
use datafusion_physical_expr::{ConstExpr, EquivalenceProperties, PhysicalSortExpr};
2221
use std::any::Any;
22+
use std::cmp::Ordering;
2323
use std::sync::Arc;
2424

2525
use arrow::compute::{lexsort_to_indices, SortColumn};
2626
use arrow::datatypes::{DataType, Field, Schema};
27-
use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, UInt32Array};
27+
use arrow_array::{
28+
ArrayRef, Float32Array, Float64Array, PrimitiveArray, RecordBatch, UInt32Array,
29+
};
2830
use arrow_schema::{SchemaRef, SortOptions};
31+
use datafusion_common::utils::{
32+
compare_rows, get_record_batch_at_indices, get_row_at_idx,
33+
};
2934
use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, Result};
30-
3135
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
3236
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
3337
use datafusion_physical_expr::equivalence::{EquivalenceClass, ProjectionMapping};
3438
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
39+
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
40+
3541
use itertools::izip;
3642
use rand::prelude::*;
3743

@@ -67,7 +73,7 @@ pub fn output_schema(
6773
}
6874

6975
// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
70-
fn create_test_schema_2() -> Result<SchemaRef> {
76+
pub fn create_test_schema_2() -> Result<SchemaRef> {
7177
let a = Field::new("a", DataType::Float64, true);
7278
let b = Field::new("b", DataType::Float64, true);
7379
let c = Field::new("c", DataType::Float64, true);
@@ -374,6 +380,114 @@ pub fn generate_table_for_eq_properties(
374380
Ok(RecordBatch::try_from_iter(res)?)
375381
}
376382

383+
// Generate a table that satisfies the given orderings;
384+
pub fn generate_table_for_orderings(
385+
mut orderings: Vec<LexOrdering>,
386+
schema: SchemaRef,
387+
n_elem: usize,
388+
n_distinct: usize,
389+
) -> Result<RecordBatch> {
390+
let mut rng = StdRng::seed_from_u64(23);
391+
392+
assert!(!orderings.is_empty());
393+
// Sort the inner vectors by their lengths (longest first)
394+
orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));
395+
396+
let arrays = schema
397+
.fields
398+
.iter()
399+
.map(|field| {
400+
(
401+
field.name(),
402+
generate_random_f64_array(n_elem, n_distinct, &mut rng),
403+
)
404+
})
405+
.collect::<Vec<_>>();
406+
let batch = RecordBatch::try_from_iter(arrays)?;
407+
408+
// Sort batch according to first ordering expression
409+
let sort_columns = get_sort_columns(&batch, &orderings[0])?;
410+
let sort_indices = lexsort_to_indices(&sort_columns, None)?;
411+
let mut batch = get_record_batch_at_indices(&batch, &sort_indices)?;
412+
413+
// prune out rows that is invalid according to remaining orderings.
414+
for ordering in orderings.iter().skip(1) {
415+
let sort_columns = get_sort_columns(&batch, ordering)?;
416+
417+
// Collect sort options and values into separate vectors.
418+
let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns
419+
.into_iter()
420+
.map(|sort_col| (sort_col.options.unwrap(), sort_col.values))
421+
.unzip();
422+
423+
let mut cur_idx = 0;
424+
let mut keep_indices = vec![cur_idx as u32];
425+
for next_idx in 1..batch.num_rows() {
426+
let cur_row = get_row_at_idx(&sort_col_values, cur_idx)?;
427+
let next_row = get_row_at_idx(&sort_col_values, next_idx)?;
428+
429+
if compare_rows(&cur_row, &next_row, &sort_options)? != Ordering::Greater {
430+
// next row satisfies ordering relation given, compared to the current row.
431+
keep_indices.push(next_idx as u32);
432+
cur_idx = next_idx;
433+
}
434+
}
435+
// Only keep valid rows, that satisfies given ordering relation.
436+
batch = get_record_batch_at_indices(
437+
&batch,
438+
&PrimitiveArray::from_iter_values(keep_indices),
439+
)?;
440+
}
441+
442+
Ok(batch)
443+
}
444+
445+
// Convert each tuple to PhysicalSortExpr
446+
pub fn convert_to_sort_exprs(
447+
in_data: &[(&Arc<dyn PhysicalExpr>, SortOptions)],
448+
) -> Vec<PhysicalSortExpr> {
449+
in_data
450+
.iter()
451+
.map(|(expr, options)| PhysicalSortExpr {
452+
expr: Arc::clone(*expr),
453+
options: *options,
454+
})
455+
.collect()
456+
}
457+
458+
// Convert each inner tuple to PhysicalSortExpr
459+
pub fn convert_to_orderings(
460+
orderings: &[Vec<(&Arc<dyn PhysicalExpr>, SortOptions)>],
461+
) -> Vec<Vec<PhysicalSortExpr>> {
462+
orderings
463+
.iter()
464+
.map(|sort_exprs| convert_to_sort_exprs(sort_exprs))
465+
.collect()
466+
}
467+
468+
// Utility function to generate random f64 array
469+
fn generate_random_f64_array(
470+
n_elems: usize,
471+
n_distinct: usize,
472+
rng: &mut StdRng,
473+
) -> ArrayRef {
474+
let values: Vec<f64> = (0..n_elems)
475+
.map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0)
476+
.collect();
477+
Arc::new(Float64Array::from_iter_values(values))
478+
}
479+
480+
// Helper function to get sort columns from a batch
481+
fn get_sort_columns(
482+
batch: &RecordBatch,
483+
ordering: LexOrderingRef,
484+
) -> Result<Vec<SortColumn>> {
485+
ordering
486+
.iter()
487+
.map(|expr| expr.evaluate_to_sort_column(batch))
488+
.collect::<Result<Vec<_>>>()
489+
}
490+
377491
#[derive(Debug, Clone)]
378492
pub struct TestScalarUDF {
379493
pub(crate) signature: Signature,

datafusion/expr/src/udf_docs.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,23 +147,29 @@ impl DocumentationBuilder {
147147

148148
/// Add a standard "expression" argument to the documentation
149149
///
150-
/// This is similar to [`Self::with_argument`] except that a standard
151-
/// description is appended to the end: `"Can be a constant, column, or
152-
/// function, and any combination of arithmetic operators."`
153-
///
154-
/// The argument is rendered like
150+
/// The argument is rendered like below if Some() is passed through:
155151
///
156152
/// ```text
157153
/// <arg_name>:
158154
/// <expression_type> expression to operate on. Can be a constant, column, or function, and any combination of operators.
159155
/// ```
156+
///
157+
/// The argument is rendered like below if None is passed through:
158+
///
159+
/// ```text
160+
/// <arg_name>:
161+
/// The expression to operate on. Can be a constant, column, or function, and any combination of operators.
162+
/// ```
160163
pub fn with_standard_argument(
161164
self,
162165
arg_name: impl Into<String>,
163-
expression_type: impl AsRef<str>,
166+
expression_type: Option<&str>,
164167
) -> Self {
165-
let expression_type = expression_type.as_ref();
166-
self.with_argument(arg_name, format!("{expression_type} expression to operate on. Can be a constant, column, or function, and any combination of operators."))
168+
let description = format!(
169+
"{} expression to operate on. Can be a constant, column, or function, and any combination of operators.",
170+
expression_type.unwrap_or("The")
171+
);
172+
self.with_argument(arg_name, description)
167173
}
168174

169175
pub fn with_related_udf(mut self, related_udf: impl Into<String>) -> Self {

0 commit comments

Comments
 (0)