Skip to content

Add statistics_truncate_length parquet writer config #14782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ config_namespace! {
/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets statictics truncate length. If NULL, uses
/// default parquet writer setting
pub statistics_truncate_length: Option<usize>, default = None

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = 20_000

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl ParquetOptions {
max_row_group_size,
created_by,
column_index_truncate_length,
statistics_truncate_length,
data_page_row_count_limit,
encoding,
bloom_filter_on_write,
Expand Down Expand Up @@ -255,6 +256,7 @@ impl ParquetOptions {
.set_max_row_group_size(*max_row_group_size)
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_statistics_truncate_length(*statistics_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_on_write);

Expand Down Expand Up @@ -491,6 +493,7 @@ mod tests {
max_row_group_size: 42,
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
statistics_truncate_length: Some(42),
data_page_row_count_limit: 42,
encoding: Some("BYTE_STREAM_SPLIT".into()),
bloom_filter_on_write: !defaults.bloom_filter_on_write,
Expand Down Expand Up @@ -587,6 +590,7 @@ mod tests {
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
column_index_truncate_length: props.column_index_truncate_length(),
statistics_truncate_length: props.statistics_truncate_length(),
data_page_row_count_limit: props.data_page_row_count_limit(),

// global options which set the default column props
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,10 @@ message ParquetOptions {
uint64 column_index_truncate_length = 17;
}

oneof statistics_truncate_length_opt {
uint64 statistics_truncate_length = 31;
}

oneof encoding_opt {
string encoding = 19;
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,12 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
})
.unwrap_or(None),
statistics_truncate_length: value
.statistics_truncate_length_opt.as_ref()
.map(|opt| match opt {
protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
})
.unwrap_or(None),
data_page_row_count_limit: value.data_page_row_count_limit as usize,
encoding: value
.encoding_opt.clone()
Expand Down
25 changes: 25 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,6 +3133,7 @@ impl serde::Serialize for Field {
}
}
impl<'de> serde::Deserialize<'de> for Field {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
Expand Down Expand Up @@ -4968,6 +4969,9 @@ impl serde::Serialize for ParquetOptions {
if self.column_index_truncate_length_opt.is_some() {
len += 1;
}
if self.statistics_truncate_length_opt.is_some() {
len += 1;
}
if self.encoding_opt.is_some() {
len += 1;
}
Expand Down Expand Up @@ -5100,6 +5104,15 @@ impl serde::Serialize for ParquetOptions {
}
}
}
if let Some(v) = self.statistics_truncate_length_opt.as_ref() {
match v {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("statisticsTruncateLength", ToString::to_string(&v).as_str())?;
}
}
}
if let Some(v) = self.encoding_opt.as_ref() {
match v {
parquet_options::EncodingOpt::Encoding(v) => {
Expand Down Expand Up @@ -5183,6 +5196,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"maxStatisticsSize",
"column_index_truncate_length",
"columnIndexTruncateLength",
"statistics_truncate_length",
"statisticsTruncateLength",
"encoding",
"bloom_filter_fpp",
"bloomFilterFpp",
Expand Down Expand Up @@ -5218,6 +5233,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
StatisticsEnabled,
MaxStatisticsSize,
ColumnIndexTruncateLength,
StatisticsTruncateLength,
Encoding,
BloomFilterFpp,
BloomFilterNdv,
Expand Down Expand Up @@ -5268,6 +5284,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"statisticsEnabled" | "statistics_enabled" => Ok(GeneratedField::StatisticsEnabled),
"maxStatisticsSize" | "max_statistics_size" => Ok(GeneratedField::MaxStatisticsSize),
"columnIndexTruncateLength" | "column_index_truncate_length" => Ok(GeneratedField::ColumnIndexTruncateLength),
"statisticsTruncateLength" | "statistics_truncate_length" => Ok(GeneratedField::StatisticsTruncateLength),
"encoding" => Ok(GeneratedField::Encoding),
"bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp),
"bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv),
Expand Down Expand Up @@ -5316,6 +5333,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut statistics_enabled_opt__ = None;
let mut max_statistics_size_opt__ = None;
let mut column_index_truncate_length_opt__ = None;
let mut statistics_truncate_length_opt__ = None;
let mut encoding_opt__ = None;
let mut bloom_filter_fpp_opt__ = None;
let mut bloom_filter_ndv_opt__ = None;
Expand Down Expand Up @@ -5491,6 +5509,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
column_index_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(x.0));
}
GeneratedField::StatisticsTruncateLength => {
if statistics_truncate_length_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("statisticsTruncateLength"));
}
statistics_truncate_length_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(x.0));
}
GeneratedField::Encoding => {
if encoding_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("encoding"));
Expand Down Expand Up @@ -5538,6 +5562,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
statistics_enabled_opt: statistics_enabled_opt__,
max_statistics_size_opt: max_statistics_size_opt__,
column_index_truncate_length_opt: column_index_truncate_length_opt__,
statistics_truncate_length_opt: statistics_truncate_length_opt__,
encoding_opt: encoding_opt__,
bloom_filter_fpp_opt: bloom_filter_fpp_opt__,
bloom_filter_ndv_opt: bloom_filter_ndv_opt__,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ pub struct ParquetOptions {
pub column_index_truncate_length_opt: ::core::option::Option<
parquet_options::ColumnIndexTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")]
pub statistics_truncate_length_opt: ::core::option::Option<
parquet_options::StatisticsTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::EncodingOpt", tags = "19")]
pub encoding_opt: ::core::option::Option<parquet_options::EncodingOpt>,
#[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")]
Expand Down Expand Up @@ -833,6 +837,11 @@ pub mod parquet_options {
#[prost(uint64, tag = "17")]
ColumnIndexTruncateLength(u64),
}
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum StatisticsTruncateLengthOpt {
#[prost(uint64, tag = "31")]
StatisticsTruncateLength(u64),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum EncodingOpt {
#[prost(string, tag = "19")]
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
max_row_group_size: value.max_row_group_size as u64,
created_by: value.created_by.clone(),
column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
statistics_truncate_length_opt: value.statistics_truncate_length.map(|v| protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v as u64)),
data_page_row_count_limit: value.data_page_row_count_limit as u64,
encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
bloom_filter_on_read: value.bloom_filter_on_read,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,10 @@ pub struct ParquetOptions {
pub column_index_truncate_length_opt: ::core::option::Option<
parquet_options::ColumnIndexTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::StatisticsTruncateLengthOpt", tags = "31")]
pub statistics_truncate_length_opt: ::core::option::Option<
parquet_options::StatisticsTruncateLengthOpt,
>,
#[prost(oneof = "parquet_options::EncodingOpt", tags = "19")]
pub encoding_opt: ::core::option::Option<parquet_options::EncodingOpt>,
#[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")]
Expand Down Expand Up @@ -833,6 +837,11 @@ pub mod parquet_options {
#[prost(uint64, tag = "17")]
ColumnIndexTruncateLength(u64),
}
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum StatisticsTruncateLengthOpt {
#[prost(uint64, tag = "31")]
StatisticsTruncateLength(u64),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum EncodingOpt {
#[prost(string, tag = "19")]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ impl TableParquetOptionsProto {
column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64)
}),
statistics_truncate_length_opt: global_options.global.statistics_truncate_length.map(|length| {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length as u64)
}),
data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64,
encoding_opt: global_options.global.encoding.map(|encoding| {
parquet_options::EncodingOpt::Encoding(encoding)
Expand Down Expand Up @@ -487,6 +490,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions {
column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize,
}),
statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt {
parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize,
}),
data_page_row_count_limit: proto.data_page_row_count_limit as usize,
encoding: proto.encoding_opt.as_ref().map(|opt| match opt {
parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ datafusion.execution.parquet.schema_force_view_types true
datafusion.execution.parquet.skip_arrow_metadata false
datafusion.execution.parquet.skip_metadata true
datafusion.execution.parquet.statistics_enabled page
datafusion.execution.parquet.statistics_truncate_length NULL
datafusion.execution.parquet.write_batch_size 1024
datafusion.execution.parquet.writer_version 1.0
datafusion.execution.planning_concurrency 13
Expand Down Expand Up @@ -313,6 +314,7 @@ datafusion.execution.parquet.schema_force_view_types true (reading) If true, par
datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.statistics_truncate_length NULL (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. |
| datafusion.execution.parquet.created_by | datafusion version 45.0.0 | (writing) Sets "created by" property |
| datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length |
| datafusion.execution.parquet.statistics_truncate_length | NULL | (writing) Sets statictics truncate length. If NULL, uses default parquet writer setting |
| datafusion.execution.parquet.data_page_row_count_limit | 20000 | (writing) Sets best effort maximum number of rows in data page |
| datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting |
| datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files |
Expand Down