From 4e5f42bbe266145b0c17b992a7bd862c127ef182 Mon Sep 17 00:00:00 2001 From: blaginin Date: Sat, 1 Mar 2025 10:44:07 +0000 Subject: [PATCH 1/3] Do not swap with projection when file is partitioned --- datafusion/datasource/src/file_scan_config.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 79279b5c8231..33d4810acb68 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -266,7 +266,10 @@ impl DataSource for FileScanConfig { ) -> Result>> { // If there is any non-column or alias-carrier expression, Projection should not be removed. // This process can be moved into CsvExec, but it would be an overlap of their responsibility. - Ok(all_alias_free_columns(projection.expr()).then(|| { + + Ok((all_alias_free_columns(projection.expr()) + && self.table_partition_cols.is_empty()) + .then(|| { let file_scan = self.clone(); let source = Arc::clone(&file_scan.file_source); let new_projections = new_projections_for_columns( From 55f8b30b993c55eabf4b8e9b34f7c6c25fc6540a Mon Sep 17 00:00:00 2001 From: blaginin Date: Sat, 1 Mar 2025 21:25:56 +0000 Subject: [PATCH 2/3] Narrow the case when not swapping --- datafusion/datasource/src/file_scan_config.rs | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 33d4810acb68..bee74e042f22 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -267,24 +267,32 @@ impl DataSource for FileScanConfig { // If there is any non-column or alias-carrier expression, Projection should not be removed. // This process can be moved into CsvExec, but it would be an overlap of their responsibility. - Ok((all_alias_free_columns(projection.expr()) - && self.table_partition_cols.is_empty()) - .then(|| { - let file_scan = self.clone(); - let source = Arc::clone(&file_scan.file_source); - let new_projections = new_projections_for_columns( - projection, - &file_scan - .projection - .clone() - .unwrap_or((0..self.file_schema.fields().len()).collect()), - ); - file_scan - // Assign projected statistics to source - .with_projection(Some(new_projections)) - .with_source(source) - .build() as _ - })) + let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| { + expr.as_any() + .downcast_ref::() + .map(|expr| expr.index() >= self.file_schema.fields().len()) + .unwrap_or(false) + }); + + Ok( + (all_alias_free_columns(projection.expr()) && !partitioned_columns_in_proj) + .then(|| { + let file_scan = self.clone(); + let source = Arc::clone(&file_scan.file_source); + let new_projections = new_projections_for_columns( + projection, + &file_scan + .projection + .clone() + .unwrap_or((0..self.file_schema.fields().len()).collect()), + ); + file_scan + // Assign projected statistics to source + .with_projection(Some(new_projections)) + .with_source(source) + .build() as _ + }), + ) } } From 8d0f7ea4231fc61b409811ccdf2dac8a09e988ac Mon Sep 17 00:00:00 2001 From: blaginin Date: Sat, 1 Mar 2025 23:09:14 +0000 Subject: [PATCH 3/3] Add test --- .../physical_optimizer/projection_pushdown.rs | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 836758b21318..77c32f562355 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1382,3 +1382,54 @@ fn test_union_after_projection() -> Result<()> { Ok(()) } + +#[test] +fn test_partition_col_projection_pushdown() -> Result<()> { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, true), + Field::new("string_col", DataType::Utf8, true), + ])); + + let partitioned_schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, true), + Field::new("string_col", DataType::Utf8, true), + Field::new("partition_col", DataType::Utf8, true), + ])); + + let source = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + file_schema.clone(), + Arc::new(CsvSource::default()), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)]) + .with_projection(Some(vec![0, 1, 2])) + .build(); + + let projection = Arc::new(ProjectionExec::try_new( + vec![ + ( + col("string_col", partitioned_schema.as_ref())?, + "string_col".to_string(), + ), + ( + col("partition_col", partitioned_schema.as_ref())?, + "partition_col".to_string(), + ), + ( + col("int_col", partitioned_schema.as_ref())?, + "int_col".to_string(), + ), + ], + source, + )?); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = ["ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +}