From 0600b6eb0913697db1e80fd346f5db930429f712 Mon Sep 17 00:00:00 2001 From: Andrey Koshchiy Date: Thu, 20 Feb 2025 00:54:09 +0300 Subject: [PATCH 1/2] Add parquet writer config --- datafusion/common/src/config.rs | 3 +++ .../common/src/file_options/parquet_writer.rs | 4 +++ .../proto/datafusion_common.proto | 4 +++ datafusion/proto-common/src/from_proto/mod.rs | 6 +++++ .../proto-common/src/generated/pbjson.rs | 25 +++++++++++++++++++ .../proto-common/src/generated/prost.rs | 9 +++++++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 9 +++++++ .../proto/src/logical_plan/file_formats.rs | 6 +++++ 9 files changed, 67 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5e8317c081d9..f2f1dfcc919b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -503,6 +503,9 @@ config_namespace! { /// (writing) Sets column index truncate length pub column_index_truncate_length: Option, default = Some(64) + /// (writing) Sets statictics truncate length + pub statistics_truncate_length: Option, default = None + /// (writing) Sets best effort maximum number of rows in data page pub data_page_row_count_limit: usize, default = 20_000 diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 8c785b84313c..939cb5e1a357 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -219,6 +219,7 @@ impl ParquetOptions { max_row_group_size, created_by, column_index_truncate_length, + statistics_truncate_length, data_page_row_count_limit, encoding, bloom_filter_on_write, @@ -255,6 +256,7 @@ impl ParquetOptions { .set_max_row_group_size(*max_row_group_size) .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) + .set_statistics_truncate_length(*statistics_truncate_length) .set_data_page_row_count_limit(*data_page_row_count_limit) .set_bloom_filter_enabled(*bloom_filter_on_write); @@ -491,6 +493,7 @@ mod tests { max_row_group_size: 42, created_by: "wordy".into(), column_index_truncate_length: Some(42), + statistics_truncate_length: Some(42), data_page_row_count_limit: 42, encoding: Some("BYTE_STREAM_SPLIT".into()), bloom_filter_on_write: !defaults.bloom_filter_on_write, @@ -587,6 +590,7 @@ mod tests { max_row_group_size: props.max_row_group_size(), created_by: props.created_by().to_string(), column_index_truncate_length: props.column_index_truncate_length(), + statistics_truncate_length: props.statistics_truncate_length(), data_page_row_count_limit: props.data_page_row_count_limit(), // global options which set the default column props diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 8e5d1283f838..bbeea5e1ec23 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -522,6 +522,10 @@ message ParquetOptions { uint64 column_index_truncate_length = 17; } + oneof statistics_truncate_length_opt { + uint64 statistics_truncate_length = 31; + } + oneof encoding_opt { string encoding = 19; } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 93547efeb51e..da43a9789956 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -952,6 +952,12 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize), }) .unwrap_or(None), + statistics_truncate_length: value + .statistics_truncate_length_opt.as_ref() + .map(|opt| match opt { + protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize), + }) + .unwrap_or(None), data_page_row_count_limit: value.data_page_row_count_limit as usize, encoding: value .encoding_opt.clone() diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 8c0a9041ba2c..b0241fd47a26 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -3133,6 +3133,7 @@ impl serde::Serialize for Field { } } impl<'de> serde::Deserialize<'de> for Field { + #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, @@ -4968,6 +4969,9 @@ impl serde::Serialize for ParquetOptions { if self.column_index_truncate_length_opt.is_some() { len += 1; } + if self.statistics_truncate_length_opt.is_some() { + len += 1; + } if self.encoding_opt.is_some() { len += 1; } @@ -5100,6 +5104,15 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.statistics_truncate_length_opt.as_ref() { + match v { + parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("statisticsTruncateLength", ToString::to_string(&v).as_str())?; + } + } + } if let Some(v) = self.encoding_opt.as_ref() { match v { parquet_options::EncodingOpt::Encoding(v) => { @@ -5183,6 +5196,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxStatisticsSize", "column_index_truncate_length", "columnIndexTruncateLength", + "statistics_truncate_length", + "statisticsTruncateLength", "encoding", "bloom_filter_fpp", "bloomFilterFpp", @@ -5218,6 +5233,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { StatisticsEnabled, MaxStatisticsSize, ColumnIndexTruncateLength, + StatisticsTruncateLength, Encoding, BloomFilterFpp, BloomFilterNdv, @@ -5268,6 +5284,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "statisticsEnabled" | "statistics_enabled" => Ok(GeneratedField::StatisticsEnabled), "maxStatisticsSize" | "max_statistics_size" => Ok(GeneratedField::MaxStatisticsSize), "columnIndexTruncateLength" | "column_index_truncate_length" => Ok(GeneratedField::ColumnIndexTruncateLength), + "statisticsTruncateLength" | "statistics_truncate_length" => Ok(GeneratedField::StatisticsTruncateLength), "encoding" => Ok(GeneratedField::Encoding), "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), @@ -5316,6 +5333,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut statistics_enabled_opt__ = None; let mut max_statistics_size_opt__ = None; let mut column_index_truncate_length_opt__ = None; + let mut statistics_truncate_length_opt__ = None; let mut encoding_opt__ = None; let mut bloom_filter_fpp_opt__ = None; let mut bloom_filter_ndv_opt__ = None; @@ -5491,6 +5509,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } column_index_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(x.0)); } + GeneratedField::StatisticsTruncateLength => { + if statistics_truncate_length_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("statisticsTruncateLength")); + } + statistics_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(x.0)); + } GeneratedField::Encoding => { if encoding_opt__.is_some() { return Err(serde::de::Error::duplicate_field("encoding")); @@ -5538,6 +5562,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { statistics_enabled_opt: statistics_enabled_opt__, max_statistics_size_opt: max_statistics_size_opt__, column_index_truncate_length_opt: column_index_truncate_length_opt__, + statistics_truncate_length_opt: statistics_truncate_length_opt__, encoding_opt: encoding_opt__, bloom_filter_fpp_opt: bloom_filter_fpp_opt__, bloom_filter_ndv_opt: bloom_filter_ndv_opt__, diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index db46b47efc1c..b6e9bc137983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -794,6 +794,10 @@ pub struct ParquetOptions { pub column_index_truncate_length_opt: ::core::option::Option< parquet_options::ColumnIndexTruncateLengthOpt, >, + #[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")] + pub statistics_truncate_length_opt: ::core::option::Option< + parquet_options::StatisticsTruncateLengthOpt, + >, #[prost(oneof = "parquet_options::EncodingOpt", tags = "19")] pub encoding_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")] @@ -833,6 +837,11 @@ pub mod parquet_options { #[prost(uint64, tag = "17")] ColumnIndexTruncateLength(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum StatisticsTruncateLengthOpt { + #[prost(uint64, tag = "31")] + StatisticsTruncateLength(u64), + } #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum EncodingOpt { #[prost(string, tag = "19")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 83c8e98cba97..decd0cf63038 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -823,6 +823,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { max_row_group_size: value.max_row_group_size as u64, created_by: value.created_by.clone(), column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)), + statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)), data_page_row_count_limit: value.data_page_row_count_limit as u64, encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding), bloom_filter_on_read: value.bloom_filter_on_read, diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index db46b47efc1c..b6e9bc137983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -794,6 +794,10 @@ pub struct ParquetOptions { pub column_index_truncate_length_opt: ::core::option::Option< parquet_options::ColumnIndexTruncateLengthOpt, >, + #[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")] + pub statistics_truncate_length_opt: ::core::option::Option< + parquet_options::StatisticsTruncateLengthOpt, + >, #[prost(oneof = "parquet_options::EncodingOpt", tags = "19")] pub encoding_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")] @@ -833,6 +837,11 @@ pub mod parquet_options { #[prost(uint64, tag = "17")] ColumnIndexTruncateLength(u64), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum StatisticsTruncateLengthOpt { + #[prost(uint64, tag = "31")] + StatisticsTruncateLength(u64), + } #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum EncodingOpt { #[prost(string, tag = "19")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 237e6d2a7137..e22738973284 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -394,6 +394,9 @@ impl TableParquetOptionsProto { column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| { parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64) }), + statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| { + parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64) + }), data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64, encoding_opt: global_options.global.encoding.map(|encoding| { parquet_options::EncodingOpt::Encoding(encoding) @@ -487,6 +490,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions { column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt { parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize, }), + statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt { + parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize, + }), data_page_row_count_limit: proto.data_page_row_count_limit as usize, encoding: proto.encoding_opt.as_ref().map(|opt| match opt { parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(), From e47a42fba252417a95fb093b4cb96f0c9f300f18 Mon Sep 17 00:00:00 2001 From: Andrey Koshchiy Date: Thu, 20 Feb 2025 01:30:32 +0300 Subject: [PATCH 2/2] test fixes --- datafusion/common/src/config.rs | 3 ++- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f2f1dfcc919b..34c30abd5101 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -503,7 +503,8 @@ config_namespace! { /// (writing) Sets column index truncate length pub column_index_truncate_length: Option, default = Some(64) - /// (writing) Sets statictics truncate length + /// (writing) Sets statictics truncate length. If NULL, uses + /// default parquet writer setting pub statistics_truncate_length: Option, default = None /// (writing) Sets best effort maximum number of rows in data page diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5a1caad46732..48b5480d8ee8 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -218,6 +218,7 @@ datafusion.execution.parquet.schema_force_view_types true datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page +datafusion.execution.parquet.statistics_truncate_length NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 @@ -313,6 +314,7 @@ datafusion.execution.parquet.schema_force_view_types true (reading) If true, par datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.statistics_truncate_length NULL (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 999735f4c059..a454a1777b64 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -70,6 +70,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | | datafusion.execution.parquet.created_by | datafusion version 45.0.0 | (writing) Sets "created by" property | | datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | +| datafusion.execution.parquet.statistics_truncate_length | NULL | (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page | | datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files |