Skip to content

Commit 9237729

Browse files
authored
Minor: Add more projection pushdown tests, clarify comments (#14963)
* Minor: Add more projection pushdown tests * Improve comment and indenting
1 parent e5fa3be commit 9237729

File tree

2 files changed

+76
-33
lines changed

2 files changed

+76
-33
lines changed

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
3131
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3232
use datafusion_expr::{Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility};
3333
use datafusion_physical_expr::expressions::{
34-
binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
34+
binary, cast, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
3535
};
3636
use datafusion_physical_expr::ScalarFunctionExpr;
3737
use datafusion_physical_expr::{
@@ -1383,28 +1383,29 @@ fn test_union_after_projection() -> Result<()> {
13831383
Ok(())
13841384
}
13851385

1386-
#[test]
1387-
fn test_partition_col_projection_pushdown() -> Result<()> {
1386+
/// Returns a DataSourceExec that scans a file with (int_col, string_col)
1387+
/// and has a partitioning column partition_col (Utf8)
1388+
fn partitioned_data_source() -> Arc<DataSourceExec> {
13881389
let file_schema = Arc::new(Schema::new(vec![
13891390
Field::new("int_col", DataType::Int32, true),
13901391
Field::new("string_col", DataType::Utf8, true),
13911392
]));
13921393

1393-
let partitioned_schema = Arc::new(Schema::new(vec![
1394-
Field::new("int_col", DataType::Int32, true),
1395-
Field::new("string_col", DataType::Utf8, true),
1396-
Field::new("partition_col", DataType::Utf8, true),
1397-
]));
1398-
1399-
let source = FileScanConfig::new(
1394+
FileScanConfig::new(
14001395
ObjectStoreUrl::parse("test:///").unwrap(),
14011396
file_schema.clone(),
14021397
Arc::new(CsvSource::default()),
14031398
)
14041399
.with_file(PartitionedFile::new("x".to_string(), 100))
14051400
.with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)])
14061401
.with_projection(Some(vec![0, 1, 2]))
1407-
.build();
1402+
.build()
1403+
}
1404+
1405+
#[test]
1406+
fn test_partition_col_projection_pushdown() -> Result<()> {
1407+
let source = partitioned_data_source();
1408+
let partitioned_schema = source.schema();
14081409

14091410
let projection = Arc::new(ProjectionExec::try_new(
14101411
vec![
@@ -1427,8 +1428,50 @@ fn test_partition_col_projection_pushdown() -> Result<()> {
14271428
let after_optimize =
14281429
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
14291430

1430-
let expected = ["ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]",
1431-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"];
1431+
let expected = [
1432+
"ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]",
1433+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"
1434+
];
1435+
assert_eq!(get_plan_string(&after_optimize), expected);
1436+
1437+
Ok(())
1438+
}
1439+
1440+
#[test]
1441+
fn test_partition_col_projection_pushdown_expr() -> Result<()> {
1442+
let source = partitioned_data_source();
1443+
let partitioned_schema = source.schema();
1444+
1445+
let projection = Arc::new(ProjectionExec::try_new(
1446+
vec![
1447+
(
1448+
col("string_col", partitioned_schema.as_ref())?,
1449+
"string_col".to_string(),
1450+
),
1451+
(
1452+
// CAST(partition_col, Utf8View)
1453+
cast(
1454+
col("partition_col", partitioned_schema.as_ref())?,
1455+
partitioned_schema.as_ref(),
1456+
DataType::Utf8View,
1457+
)?,
1458+
"partition_col".to_string(),
1459+
),
1460+
(
1461+
col("int_col", partitioned_schema.as_ref())?,
1462+
"int_col".to_string(),
1463+
),
1464+
],
1465+
source,
1466+
)?);
1467+
1468+
let after_optimize =
1469+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1470+
1471+
let expected = [
1472+
"ProjectionExec: expr=[string_col@1 as string_col, CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col]",
1473+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"
1474+
];
14321475
assert_eq!(get_plan_string(&after_optimize), expected);
14331476

14341477
Ok(())

datafusion/datasource/src/file_scan_config.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -264,35 +264,35 @@ impl DataSource for FileScanConfig {
264264
&self,
265265
projection: &ProjectionExec,
266266
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
267-
// If there is any non-column or alias-carrier expression, Projection should not be removed.
268267
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
269268

269+
// Must be all column references, with no table partition columns (which can not be projected)
270270
let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
271271
expr.as_any()
272272
.downcast_ref::<Column>()
273273
.map(|expr| expr.index() >= self.file_schema.fields().len())
274274
.unwrap_or(false)
275275
});
276276

277-
Ok(
278-
(all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj)
279-
.then(|| {
280-
let file_scan = self.clone();
281-
let source = Arc::clone(&file_scan.file_source);
282-
let new_projections = new_projections_for_columns(
283-
projection,
284-
&file_scan
285-
.projection
286-
.clone()
287-
.unwrap_or((0..self.file_schema.fields().len()).collect()),
288-
);
289-
file_scan
290-
// Assign projected statistics to source
291-
.with_projection(Some(new_projections))
292-
.with_source(source)
293-
.build() as _
294-
}),
295-
)
277+
// If there is any non-column or alias-carrier expression, Projection should not be removed.
278+
let no_aliases = all_alias_free_columns(projection.expr());
279+
280+
Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
281+
let file_scan = self.clone();
282+
let source = Arc::clone(&file_scan.file_source);
283+
let new_projections = new_projections_for_columns(
284+
projection,
285+
&file_scan
286+
.projection
287+
.clone()
288+
.unwrap_or((0..self.file_schema.fields().len()).collect()),
289+
);
290+
file_scan
291+
// Assign projected statistics to source
292+
.with_projection(Some(new_projections))
293+
.with_source(source)
294+
.build() as _
295+
}))
296296
}
297297
}
298298

0 commit comments

Comments
 (0)