From 24c0d6c4d3d5c4a0646357ad5d38608b3482f97a Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 17 Apr 2025 22:36:39 +0800 Subject: [PATCH 1/3] fix: parquet coerce_int96 schema --- .../datasource-parquet/src/file_format.rs | 18 ++++++++++++++++-- datafusion/datasource-parquet/src/source.rs | 4 +++- .../test_files/create_external_table.slt | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 2ef4f236f278..c3761f791ff6 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -61,7 +61,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use crate::can_expr_be_pushed_down_with_schemas; -use crate::source::ParquetSource; +use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; @@ -304,9 +304,10 @@ async fn fetch_schema_with_location( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + coerce_int96: Option, ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); - let schema = fetch_schema(store, file, metadata_size_hint).await?; + let schema = fetch_schema(store, file, metadata_size_hint, coerce_int96).await?; Ok((loc_path, schema)) } @@ -337,12 +338,17 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { + let coerce_int96 = match self.coerce_int96() { + Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), + None => None, + }; let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( store.as_ref(), object, self.metadata_size_hint(), + coerce_int96, ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -825,6 +831,7 @@ async fn fetch_schema( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + coerce_int96: Option, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; let file_metadata = metadata.file_metadata(); @@ -832,6 +839,13 @@ async fn fetch_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + let schema = match coerce_int96 { + Some(time_unit) => { + coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit) + .unwrap_or(schema) + } + None => schema, + }; Ok(schema) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6236525fcb9f..e15f5243cd27 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -439,7 +439,9 @@ impl ParquetSource { } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit -fn parse_coerce_int96_string(str_setting: &str) -> datafusion_common::Result { +pub(crate) fn parse_coerce_int96_string( + str_setting: &str, +) -> datafusion_common::Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 03cb5edb5fcc..9bab78557f42 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -297,3 +297,21 @@ CREATE EXTERNAL TABLE staging.foo STORED AS parquet LOCATION '../../parquet-test # Create external table with qualified name, but no schema should error statement error DataFusion error: Error during planning: failed to resolve schema: release CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; + + +statement ok +set datafusion.execution.parquet.coerce_int96 = ms; + +statement ok +CREATE EXTERNAL TABLE int96_from_spark +STORED AS PARQUET +LOCATION '../../parquet-testing/data/int96_from_spark.parquet'; + +# Print schema +query TTT +describe int96_from_spark; +---- +a Timestamp(Millisecond, None) YES + +statement ok +set datafusion.execution.parquet.coerce_int96 = ns; From 584a2600ecc651c427a739c9c064e75a6f1511f0 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 17 Apr 2025 23:39:56 +0800 Subject: [PATCH 2/3] move test to parquet.slt --- .../test_files/create_external_table.slt | 18 ------------------ datafusion/sqllogictest/test_files/parquet.slt | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 9bab78557f42..03cb5edb5fcc 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -297,21 +297,3 @@ CREATE EXTERNAL TABLE staging.foo STORED AS parquet LOCATION '../../parquet-test # Create external table with qualified name, but no schema should error statement error DataFusion error: Error during planning: failed to resolve schema: release CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; - - -statement ok -set datafusion.execution.parquet.coerce_int96 = ms; - -statement ok -CREATE EXTERNAL TABLE int96_from_spark -STORED AS PARQUET -LOCATION '../../parquet-testing/data/int96_from_spark.parquet'; - -# Print schema -query TTT -describe int96_from_spark; ----- -a Timestamp(Millisecond, None) YES - -statement ok -set datafusion.execution.parquet.coerce_int96 = ns; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 2970b2effb3e..0823a9218268 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -629,3 +629,21 @@ physical_plan statement ok drop table foo + + +statement ok +set datafusion.execution.parquet.coerce_int96 = ms; + +statement ok +CREATE EXTERNAL TABLE int96_from_spark +STORED AS PARQUET +LOCATION '../../parquet-testing/data/int96_from_spark.parquet'; + +# Print schema +query TTT +describe int96_from_spark; +---- +a Timestamp(Millisecond, None) YES + +statement ok +set datafusion.execution.parquet.coerce_int96 = ns; From f0e969e0333c443380678d607eeb597861eb33e9 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Sat, 19 Apr 2025 21:57:40 +0800 Subject: [PATCH 3/3] update based on comphead's suggestion --- datafusion/datasource-parquet/src/file_format.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index c3761f791ff6..ee4db50a6eda 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -839,13 +839,11 @@ async fn fetch_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; - let schema = match coerce_int96 { - Some(time_unit) => { + let schema = coerce_int96 + .and_then(|time_unit| { coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit) - .unwrap_or(schema) - } - None => schema, - }; + }) + .unwrap_or(schema); Ok(schema) }