diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 5b841db53c5e..807d885b3a4d 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -21,6 +21,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; +use crate::error::_plan_err; use arrow::datatypes::{DataType, Schema, SchemaRef}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -271,11 +272,25 @@ pub struct Statistics { pub num_rows: Precision, /// Total bytes of the table rows. pub total_byte_size: Precision, - /// Statistics on a column level. It contains a [`ColumnStatistics`] for - /// each field in the schema of the table to which the [`Statistics`] refer. + /// Statistics on a column level. + /// + /// It must contains a [`ColumnStatistics`] for each field in the schema of + /// the table to which the [`Statistics`] refer. pub column_statistics: Vec, } +impl Default for Statistics { + /// Returns a new [`Statistics`] instance with all fields set to unknown + /// and no columns. + fn default() -> Self { + Self { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![], + } + } +} + impl Statistics { /// Returns a [`Statistics`] instance for the given schema by assigning /// unknown statistics to each column in the schema. @@ -296,6 +311,24 @@ impl Statistics { .collect() } + /// Set the number of rows + pub fn with_num_rows(mut self, num_rows: Precision) -> Self { + self.num_rows = num_rows; + self + } + + /// Set the total size, in bytes + pub fn with_total_byte_size(mut self, total_byte_size: Precision) -> Self { + self.total_byte_size = total_byte_size; + self + } + + /// Add a column to the column statistics + pub fn add_column_statistics(mut self, column_stats: ColumnStatistics) -> Self { + self.column_statistics.push(column_stats); + self + } + /// If the exactness of a [`Statistics`] instance is lost, this function relaxes /// the exactness of all information by converting them [`Precision::Inexact`]. pub fn to_inexact(mut self) -> Self { @@ -351,7 +384,8 @@ impl Statistics { self } - /// Calculates the statistics after `fetch` and `skip` operations apply. + /// Calculates the statistics after applying `fetch` and `skip` operations. + /// /// Here, `self` denotes per-partition statistics. Use the `n_partitions` /// parameter to compute global statistics in a multi-partition setting. pub fn with_fetch( @@ -414,6 +448,100 @@ impl Statistics { self.total_byte_size = Precision::Absent; Ok(self) } + + /// Summarize zero or more statistics into a single `Statistics` instance. + /// + /// Returns an error if the statistics do not match the specified schemas. + pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result + where + I: IntoIterator, + { + let mut items = items.into_iter(); + + let Some(init) = items.next() else { + return Ok(Statistics::new_unknown(schema)); + }; + items.try_fold(init.clone(), |acc: Statistics, item_stats: &Statistics| { + acc.try_merge(item_stats) + }) + } + + /// Merge this Statistics value with another Statistics value. + /// + /// Returns an error if the statistics do not match (different schemas). + /// + /// # Example + /// ``` + /// # use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + /// # use arrow::datatypes::{Field, Schema, DataType}; + /// # use datafusion_common::stats::Precision; + /// let stats1 = Statistics::default() + /// .with_num_rows(Precision::Exact(1)) + /// .with_total_byte_size(Precision::Exact(2)) + /// .add_column_statistics(ColumnStatistics::new_unknown() + /// .with_null_count(Precision::Exact(3)) + /// .with_min_value(Precision::Exact(ScalarValue::from(4))) + /// .with_max_value(Precision::Exact(ScalarValue::from(5))) + /// ); + /// + /// let stats2 = Statistics::default() + /// .with_num_rows(Precision::Exact(10)) + /// .with_total_byte_size(Precision::Inexact(20)) + /// .add_column_statistics(ColumnStatistics::new_unknown() + /// // absent null count + /// .with_min_value(Precision::Exact(ScalarValue::from(40))) + /// .with_max_value(Precision::Exact(ScalarValue::from(50))) + /// ); + /// + /// let merged_stats = stats1.try_merge(&stats2).unwrap(); + /// let expected_stats = Statistics::default() + /// .with_num_rows(Precision::Exact(11)) + /// .with_total_byte_size(Precision::Inexact(22)) // inexact in stats2 --> inexact + /// .add_column_statistics( + /// ColumnStatistics::new_unknown() + /// .with_null_count(Precision::Absent) // missing from stats2 --> absent + /// .with_min_value(Precision::Exact(ScalarValue::from(4))) + /// .with_max_value(Precision::Exact(ScalarValue::from(50))) + /// ); + /// + /// assert_eq!(merged_stats, expected_stats) + /// ``` + pub fn try_merge(self, other: &Statistics) -> Result { + let Self { + mut num_rows, + mut total_byte_size, + mut column_statistics, + } = self; + + // Accumulate statistics for subsequent items + num_rows = num_rows.add(&other.num_rows); + total_byte_size = total_byte_size.add(&other.total_byte_size); + + if column_statistics.len() != other.column_statistics.len() { + return _plan_err!( + "Cannot merge statistics with different number of columns: {} vs {}", + column_statistics.len(), + other.column_statistics.len() + ); + } + + for (item_col_stats, col_stats) in other + .column_statistics + .iter() + .zip(column_statistics.iter_mut()) + { + col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); + col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); + col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); + col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); + } + + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } } /// Creates an estimate of the number of rows in the output using the given @@ -521,6 +649,36 @@ impl ColumnStatistics { } } + /// Set the null count + pub fn with_null_count(mut self, null_count: Precision) -> Self { + self.null_count = null_count; + self + } + + /// Set the max value + pub fn with_max_value(mut self, max_value: Precision) -> Self { + self.max_value = max_value; + self + } + + /// Set the min value + pub fn with_min_value(mut self, min_value: Precision) -> Self { + self.min_value = min_value; + self + } + + /// Set the sum value + pub fn with_sum_value(mut self, sum_value: Precision) -> Self { + self.sum_value = sum_value; + self + } + + /// Set the distinct count + pub fn with_distinct_count(mut self, distinct_count: Precision) -> Self { + self.distinct_count = distinct_count; + self + } + /// If the exactness of a [`ColumnStatistics`] instance is lost, this /// function relaxes the exactness of all information by converting them /// [`Precision::Inexact`]. @@ -537,6 +695,9 @@ impl ColumnStatistics { #[cfg(test)] mod tests { use super::*; + use crate::assert_contains; + use arrow::datatypes::Field; + use std::sync::Arc; #[test] fn test_get_value() { @@ -798,4 +959,193 @@ mod tests { distinct_count: Precision::Exact(100), } } + + #[test] + fn test_try_merge_basic() { + // Create a schema with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + // Create items with statistics + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + }, + ], + }; + + let items = vec![stats1, stats2]; + + let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap(); + + // Verify the results + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 + + // Verify column statistics + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); // 500 + 600 + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); // 1000 + 1200 + } + + #[test] + fn test_try_merge_mixed_precision() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Create items with different precision levels + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let items = vec![stats1, stats2]; + + let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap(); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + assert!(matches!(col_stats.sum_value, Precision::Absent)); + } + + #[test] + fn test_try_merge_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Empty collection + let items: Vec = vec![]; + + let summary_stats = Statistics::try_merge_iter(&items, &schema).unwrap(); + + // Verify default values for empty collection + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } + + #[test] + fn test_try_merge_mismatched_size() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // No column statistics + let stats1 = Statistics::default(); + + let stats2 = + Statistics::default().add_column_statistics(ColumnStatistics::new_unknown()); + + let items = vec![stats1, stats2]; + + let e = Statistics::try_merge_iter(&items, &schema).unwrap_err(); + assert_contains!(e.to_string(), "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1"); + } } diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 1c3d1111e5b4..8a04d77b273d 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -406,62 +406,6 @@ pub async fn get_statistics_with_limit( Ok((result_files, statistics)) } -/// Generic function to compute statistics across multiple items that have statistics -fn compute_summary_statistics( - items: I, - file_schema: &SchemaRef, - stats_extractor: impl Fn(&T) -> Option<&Statistics>, -) -> Statistics -where - I: IntoIterator, -{ - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; - - for (idx, item) in items.into_iter().enumerate() { - if let Some(item_stats) = stats_extractor(&item) { - if idx == 0 { - // First item, set values directly - num_rows = item_stats.num_rows; - total_byte_size = item_stats.total_byte_size; - for (index, column_stats) in - item_stats.column_statistics.iter().enumerate() - { - col_stats_set[index].null_count = column_stats.null_count; - col_stats_set[index].max_value = column_stats.max_value.clone(); - col_stats_set[index].min_value = column_stats.min_value.clone(); - col_stats_set[index].sum_value = column_stats.sum_value.clone(); - } - continue; - } - - // Accumulate statistics for subsequent items - num_rows = num_rows.add(&item_stats.num_rows); - total_byte_size = total_byte_size.add(&item_stats.total_byte_size); - - for (item_col_stats, col_stats) in item_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - col_stats.null_count = - col_stats.null_count.add(&item_col_stats.null_count); - col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); - col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); - col_stats.sum_value = col_stats.sum_value.add(&item_col_stats.sum_value); - } - } - } - - Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - } -} - /// Computes the summary statistics for a group of files(`FileGroup` level's statistics). /// /// This function combines statistics from all files in the file group to create @@ -486,10 +430,11 @@ pub fn compute_file_group_statistics( return Ok(file_group); } - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); + let file_group_stats = file_group.iter().filter_map(|file| { + let stats = file.statistics.as_ref()?; + Some(stats.as_ref()) + }); + let statistics = Statistics::try_merge_iter(file_group_stats, &file_schema)?; Ok(file_group.with_statistics(Arc::new(statistics))) } @@ -517,23 +462,24 @@ pub fn compute_all_files_statistics( collect_stats: bool, inexact_stats: bool, ) -> Result<(Vec, Statistics)> { - let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); - - // First compute statistics for each file group - for file_group in file_groups { - file_groups_with_stats.push(compute_file_group_statistics( - file_group, - Arc::clone(&table_schema), - collect_stats, - )?); - } + let file_groups_with_stats = file_groups + .into_iter() + .map(|file_group| { + compute_file_group_statistics( + file_group, + Arc::clone(&table_schema), + collect_stats, + ) + }) + .collect::>>()?; // Then summary statistics across all file groups - let mut statistics = compute_summary_statistics( - &file_groups_with_stats, - &table_schema, - |file_group| file_group.statistics(), - ); + let file_groups_statistics = file_groups_with_stats + .iter() + .filter_map(|file_group| file_group.statistics()); + + let mut statistics = + Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; if inexact_stats { statistics = statistics.to_inexact() @@ -549,183 +495,3 @@ pub fn add_row_stats( ) -> Precision { file_num_rows.add(&num_rows) } - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::ScalarValue; - use std::sync::Arc; - - #[test] - fn test_compute_summary_statistics_basic() { - // Create a schema with two columns - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Int32, false), - ])); - - // Create items with statistics - let stats1 = Statistics { - num_rows: Precision::Exact(10), - total_byte_size: Precision::Exact(100), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(1), - max_value: Precision::Exact(ScalarValue::Int32(Some(100))), - min_value: Precision::Exact(ScalarValue::Int32(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Exact(2), - max_value: Precision::Exact(ScalarValue::Int32(Some(200))), - min_value: Precision::Exact(ScalarValue::Int32(Some(10))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), - distinct_count: Precision::Absent, - }, - ], - }; - - let stats2 = Statistics { - num_rows: Precision::Exact(15), - total_byte_size: Precision::Exact(150), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(2), - max_value: Precision::Exact(ScalarValue::Int32(Some(120))), - min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Exact(3), - max_value: Precision::Exact(ScalarValue::Int32(Some(180))), - min_value: Precision::Exact(ScalarValue::Int32(Some(5))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), - distinct_count: Precision::Absent, - }, - ], - }; - - let items = vec![Arc::new(stats1), Arc::new(stats2)]; - - // Call compute_summary_statistics - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - // Verify the results - assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 - assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 - - // Verify column statistics - let col1_stats = &summary_stats.column_statistics[0]; - assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 - assert_eq!( - col1_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(120))) - ); - assert_eq!( - col1_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(-10))) - ); - assert_eq!( - col1_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(1100))) - ); // 500 + 600 - - let col2_stats = &summary_stats.column_statistics[1]; - assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 - assert_eq!( - col2_stats.max_value, - Precision::Exact(ScalarValue::Int32(Some(200))) - ); - assert_eq!( - col2_stats.min_value, - Precision::Exact(ScalarValue::Int32(Some(5))) - ); - assert_eq!( - col2_stats.sum_value, - Precision::Exact(ScalarValue::Int32(Some(2200))) - ); // 1000 + 1200 - } - - #[test] - fn test_compute_summary_statistics_mixed_precision() { - // Create a schema with one column - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - - // Create items with different precision levels - let stats1 = Statistics { - num_rows: Precision::Exact(10), - total_byte_size: Precision::Inexact(100), - column_statistics: vec![ColumnStatistics { - null_count: Precision::Exact(1), - max_value: Precision::Exact(ScalarValue::Int32(Some(100))), - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), - distinct_count: Precision::Absent, - }], - }; - - let stats2 = Statistics { - num_rows: Precision::Inexact(15), - total_byte_size: Precision::Exact(150), - column_statistics: vec![ColumnStatistics { - null_count: Precision::Inexact(2), - max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), - min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - }], - }; - - let items = vec![Arc::new(stats1), Arc::new(stats2)]; - - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); - assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); - - let col_stats = &summary_stats.column_statistics[0]; - assert_eq!(col_stats.null_count, Precision::Inexact(3)); - assert_eq!( - col_stats.max_value, - Precision::Inexact(ScalarValue::Int32(Some(120))) - ); - assert_eq!( - col_stats.min_value, - Precision::Inexact(ScalarValue::Int32(Some(-10))) - ); - assert!(matches!(col_stats.sum_value, Precision::Absent)); - } - - #[test] - fn test_compute_summary_statistics_empty() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - - // Empty collection - let items: Vec> = vec![]; - - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); - - // Verify default values for empty collection - assert_eq!(summary_stats.num_rows, Precision::Absent); - assert_eq!(summary_stats.total_byte_size, Precision::Absent); - assert_eq!(summary_stats.column_statistics.len(), 1); - assert_eq!( - summary_stats.column_statistics[0].null_count, - Precision::Absent - ); - } -}