diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 2ef4f236f278..ee4db50a6eda 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -61,7 +61,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use crate::can_expr_be_pushed_down_with_schemas; -use crate::source::ParquetSource; +use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; @@ -304,9 +304,10 @@ async fn fetch_schema_with_location( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + coerce_int96: Option, ) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); - let schema = fetch_schema(store, file, metadata_size_hint).await?; + let schema = fetch_schema(store, file, metadata_size_hint, coerce_int96).await?; Ok((loc_path, schema)) } @@ -337,12 +338,17 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { + let coerce_int96 = match self.coerce_int96() { + Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), + None => None, + }; let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( store.as_ref(), object, self.metadata_size_hint(), + coerce_int96, ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -825,6 +831,7 @@ async fn fetch_schema( store: &dyn ObjectStore, file: &ObjectMeta, metadata_size_hint: Option, + coerce_int96: Option, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; let file_metadata = metadata.file_metadata(); @@ -832,6 +839,11 @@ async fn fetch_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; + let schema = coerce_int96 + .and_then(|time_unit| { + coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit) + }) + .unwrap_or(schema); Ok(schema) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6236525fcb9f..e15f5243cd27 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -439,7 +439,9 @@ impl ParquetSource { } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit -fn parse_coerce_int96_string(str_setting: &str) -> datafusion_common::Result { +pub(crate) fn parse_coerce_int96_string( + str_setting: &str, +) -> datafusion_common::Result { let str_setting_lower: &str = &str_setting.to_lowercase(); match str_setting_lower { diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 2970b2effb3e..0823a9218268 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -629,3 +629,21 @@ physical_plan statement ok drop table foo + + +statement ok +set datafusion.execution.parquet.coerce_int96 = ms; + +statement ok +CREATE EXTERNAL TABLE int96_from_spark +STORED AS PARQUET +LOCATION '../../parquet-testing/data/int96_from_spark.parquet'; + +# Print schema +query TTT +describe int96_from_spark; +---- +a Timestamp(Millisecond, None) YES + +statement ok +set datafusion.execution.parquet.coerce_int96 = ns;