diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 61eeb419a480..79db5ecf5229 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -17,18 +17,16 @@ //! The table implementation. -use std::collections::HashMap; -use std::{any::Any, str::FromStr, sync::Arc}; - use super::helpers::{expr_applicable_for_cols, pruned_partition_list}; use super::{ListingTableUrl, PartitionedFile}; +use std::collections::HashMap; +use std::{any::Any, str::FromStr, sync::Arc}; use crate::datasource::{ create_ordering, file_format::{ file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport, }, - get_statistics_with_limit, physical_plan::FileSinkConfig, }; use crate::execution::context::SessionState; @@ -55,9 +53,12 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; +use datafusion_common::stats::Precision; +use datafusion_datasource::add_row_stats; +use datafusion_datasource::compute_all_files_statistics; use datafusion_datasource::file_groups::FileGroup; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use futures::{future, stream, StreamExt, TryStreamExt}; +use futures::{future, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; @@ -1115,32 +1116,26 @@ impl ListingTable { let files = file_list .map(|part_file| async { let part_file = part_file?; - if self.options.collect_stat { - let statistics = - self.do_collect_statistics(ctx, &store, &part_file).await?; - Ok((part_file, statistics)) + let statistics = if self.options.collect_stat { + self.do_collect_statistics(ctx, &store, &part_file).await? } else { - Ok(( - part_file, - Arc::new(Statistics::new_unknown(&self.file_schema)), - )) - } + Arc::new(Statistics::new_unknown(&self.file_schema)) + }; + Ok(part_file.with_statistics(statistics)) }) .boxed() .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency); - let (files, statistics) = get_statistics_with_limit( - files, + let (file_group, inexact_stats) = + get_files_with_limit(files, limit, self.options.collect_stat).await?; + + let file_groups = file_group.split_files(self.options.target_partitions); + compute_all_files_statistics( + file_groups, self.schema(), - limit, self.options.collect_stat, + inexact_stats, ) - .await?; - - Ok(( - files.split_files(self.options.target_partitions), - statistics, - )) } /// Collects statistics for a given partitioned file. @@ -1182,6 +1177,86 @@ impl ListingTable { } } +/// Processes a stream of partitioned files and returns a `FileGroup` containing the files. +/// +/// This function collects files from the provided stream until either: +/// 1. The stream is exhausted +/// 2. The accumulated number of rows exceeds the provided `limit` (if specified) +/// +/// # Arguments +/// * `files` - A stream of `Result` items to process +/// * `limit` - An optional row count limit. If provided, the function will stop collecting files +/// once the accumulated number of rows exceeds this limit +/// * `collect_stats` - Whether to collect and accumulate statistics from the files +/// +/// # Returns +/// A `Result` containing a `FileGroup` with the collected files +/// and a boolean indicating whether the statistics are inexact. +/// +/// # Note +/// The function will continue processing files if statistics are not available or if the +/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated +/// but files will still be collected. +async fn get_files_with_limit( + files: impl Stream>, + limit: Option, + collect_stats: bool, +) -> Result<(FileGroup, bool)> { + let mut file_group = FileGroup::default(); + // Fusing the stream allows us to call next safely even once it is finished. + let mut all_files = Box::pin(files.fuse()); + let mut num_rows = Precision::::Absent; + while let Some(first_file) = all_files.next().await { + let file = first_file?; + if let Some(file_statistic) = &file.statistics { + num_rows = file_statistic.num_rows; + } + file_group.push(file); + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + let conservative_num_rows = match num_rows { + Precision::Exact(nr) => nr, + _ => usize::MIN, + }; + if conservative_num_rows <= limit.unwrap_or(usize::MAX) { + while let Some(current) = all_files.next().await { + let file = current?; + if !collect_stats { + file_group.push(file); + continue; + } + + // We accumulate the number of rows, total byte size and null + // counts across all the files in question. If any file does not + // provide any information or provides an inexact value, we demote + // the statistic precision to inexact. + if let Some(file_stats) = &file.statistics { + num_rows = add_row_stats(num_rows, file_stats.num_rows); + } + file_group.push(file); + + // If the number of rows exceeds the limit, we can stop processing + // files. This only applies when we know the number of rows. It also + // currently ignores tables that have no statistics regarding the + // number of rows. + if num_rows.get_value().unwrap_or(&usize::MIN) + > &limit.unwrap_or(usize::MAX) + { + break; + } + } + } + } + // If we still have files in the stream, it means that the limit kicked + // in, and the statistic could have been different had we processed the + // files in a different order. + let inexact_stats = all_files.next().await.is_some(); + Ok((file_group, inexact_stats)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index a195c9a882dd..a15b2b6ffe13 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source; pub use datafusion_catalog::memory; pub use datafusion_catalog::stream; pub use datafusion_catalog::view; -pub use datafusion_datasource::get_statistics_with_limit; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 5fe3e25eaa1f..75c4160f145e 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -418,6 +418,11 @@ impl FileGroup { self.files.push(file); } + /// Get the statistics for this group + pub fn statistics(&self) -> Option<&Statistics> { + self.statistics.as_ref() + } + /// Partition the list of files into `n` groups pub fn split_files(mut self, n: usize) -> Vec { if self.is_empty() { diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5172dafb1f91..729283289caf 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2000,7 +2000,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -2020,7 +2020,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, metadata_size_hint: None, } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 1e7ea1255df7..e4461c0b90a4 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -60,7 +60,8 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use statistics::get_statistics_with_limit; +pub use statistics::add_row_stats; +pub use statistics::compute_all_files_statistics; /// Stream of files get listed from object store pub type PartitionedFileStream = @@ -107,7 +108,7 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, /// The estimated size of the parquet metadata, in bytes @@ -187,6 +188,12 @@ impl PartitionedFile { self.extensions = Some(extensions); self } + + // Update the statistics for this file. + pub fn with_statistics(mut self, statistics: Arc) -> Self { + self.statistics = Some(statistics); + self + } } impl From for PartitionedFile { diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 801315568a0d..7e875513f03f 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -293,6 +293,11 @@ fn sort_columns_from_physical_sort_exprs( /// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. +#[deprecated( + since = "47.0.0", + note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead" +)] +#[allow(unused)] pub async fn get_statistics_with_limit( all_files: impl Stream)>>, file_schema: SchemaRef, @@ -316,7 +321,7 @@ pub async fn get_statistics_with_limit( if let Some(first_file) = all_files.next().await { let (mut file, file_stats) = first_file?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); // First file, we set them directly from the file statistics. @@ -342,7 +347,7 @@ pub async fn get_statistics_with_limit( if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { let (mut file, file_stats) = current?; - file.statistics = Some(file_stats.as_ref().clone()); + file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); if !collect_stats { continue; @@ -404,7 +409,142 @@ pub async fn get_statistics_with_limit( Ok((result_files, statistics)) } -fn add_row_stats( +/// Generic function to compute statistics across multiple items that have statistics +fn compute_summary_statistics( + items: I, + file_schema: &SchemaRef, + stats_extractor: impl Fn(&T) -> Option<&Statistics>, +) -> Statistics +where + I: IntoIterator, +{ + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + for (idx, item) in items.into_iter().enumerate() { + if let Some(item_stats) = stats_extractor(&item) { + if idx == 0 { + // First item, set values directly + num_rows = item_stats.num_rows; + total_byte_size = item_stats.total_byte_size; + for (index, column_stats) in + item_stats.column_statistics.iter().enumerate() + { + col_stats_set[index].null_count = column_stats.null_count; + col_stats_set[index].max_value = column_stats.max_value.clone(); + col_stats_set[index].min_value = column_stats.min_value.clone(); + col_stats_set[index].sum_value = column_stats.sum_value.clone(); + } + continue; + } + + // Accumulate statistics for subsequent items + num_rows = add_row_stats(item_stats.num_rows, num_rows); + total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); + + for (item_col_stats, col_stats) in item_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + col_stats.null_count = + add_row_stats(item_col_stats.null_count, col_stats.null_count); + set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); + set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); + col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); + } + } + } + + Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + } +} + +/// Computes the summary statistics for a group of files(`FileGroup` level's statistics). +/// +/// This function combines statistics from all files in the file group to create +/// summary statistics. It handles the following aspects: +/// - Merges row counts and byte sizes across files +/// - Computes column-level statistics like min/max values +/// - Maintains appropriate precision information (exact, inexact, absent) +/// +/// # Parameters +/// * `file_group` - The group of files to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics (if false, returns original file group) +/// +/// # Returns +/// A new file group with summary statistics attached +pub fn compute_file_group_statistics( + file_group: FileGroup, + file_schema: SchemaRef, + collect_stats: bool, +) -> Result { + if !collect_stats { + return Ok(file_group); + } + + let statistics = + compute_summary_statistics(file_group.iter(), &file_schema, |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + + Ok(file_group.with_statistics(statistics)) +} + +/// Computes statistics for all files across multiple file groups. +/// +/// This function: +/// 1. Computes statistics for each individual file group +/// 2. Summary statistics across all file groups +/// 3. Optionally marks statistics as inexact +/// +/// # Parameters +/// * `file_groups` - Vector of file groups to process +/// * `file_schema` - Schema of the files +/// * `collect_stats` - Whether to collect statistics +/// * `inexact_stats` - Whether to mark the resulting statistics as inexact +/// +/// # Returns +/// A tuple containing: +/// * The processed file groups with their individual statistics attached +/// * The summary statistics across all file groups, aka all files summary statistics +pub fn compute_all_files_statistics( + file_groups: Vec, + file_schema: SchemaRef, + collect_stats: bool, + inexact_stats: bool, +) -> Result<(Vec, Statistics)> { + let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); + + // First compute statistics for each file group + for file_group in file_groups { + file_groups_with_stats.push(compute_file_group_statistics( + file_group, + Arc::clone(&file_schema), + collect_stats, + )?); + } + + // Then summary statistics across all file groups + let mut statistics = + compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + file_group.statistics() + }); + + if inexact_stats { + statistics = statistics.to_inexact() + } + + Ok((file_groups_with_stats, statistics)) +} + +pub fn add_row_stats( file_num_rows: Precision, num_rows: Precision, ) -> Precision { @@ -476,3 +616,183 @@ fn set_min_if_lesser( _ => {} } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use std::sync::Arc; + + #[test] + fn test_compute_summary_statistics_basic() { + // Create a schema with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + + // Create items with statistics + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1000))), + distinct_count: Precision::Absent, + }, + ], + }; + + let stats2 = Statistics { + num_rows: Precision::Exact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(600))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Exact(ScalarValue::Int32(Some(180))), + min_value: Precision::Exact(ScalarValue::Int32(Some(5))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(1200))), + distinct_count: Precision::Absent, + }, + ], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + // Call compute_summary_statistics + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify the results + assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 + assert_eq!(summary_stats.total_byte_size, Precision::Exact(250)); // 100 + 150 + + // Verify column statistics + let col1_stats = &summary_stats.column_statistics[0]; + assert_eq!(col1_stats.null_count, Precision::Exact(3)); // 1 + 2 + assert_eq!( + col1_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col1_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(-10))) + ); + assert_eq!( + col1_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(1100))) + ); // 500 + 600 + + let col2_stats = &summary_stats.column_statistics[1]; + assert_eq!(col2_stats.null_count, Precision::Exact(5)); // 2 + 3 + assert_eq!( + col2_stats.max_value, + Precision::Exact(ScalarValue::Int32(Some(200))) + ); + assert_eq!( + col2_stats.min_value, + Precision::Exact(ScalarValue::Int32(Some(5))) + ); + assert_eq!( + col2_stats.sum_value, + Precision::Exact(ScalarValue::Int32(Some(2200))) + ); // 1000 + 1200 + } + + #[test] + fn test_compute_summary_statistics_mixed_precision() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Create items with different precision levels + let stats1 = Statistics { + num_rows: Precision::Exact(10), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int32(Some(500))), + distinct_count: Precision::Absent, + }], + }; + + let stats2 = Statistics { + num_rows: Precision::Inexact(15), + total_byte_size: Precision::Exact(150), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(2), + max_value: Precision::Inexact(ScalarValue::Int32(Some(120))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let items = vec![Arc::new(stats1), Arc::new(stats2)]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); + assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); + + let col_stats = &summary_stats.column_statistics[0]; + assert_eq!(col_stats.null_count, Precision::Inexact(3)); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Int32(Some(120))) + ); + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Int32(Some(-10))) + ); + assert!(matches!(col_stats.sum_value, Precision::Absent)); + } + + #[test] + fn test_compute_summary_statistics_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + + // Empty collection + let items: Vec> = vec![]; + + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + + // Verify default values for empty collection + assert_eq!(summary_stats.num_rows, Precision::Absent); + assert_eq!(summary_stats.total_byte_size, Precision::Absent); + assert_eq!(summary_stats.column_statistics.len(), 1); + assert_eq!( + summary_stats.column_statistics[0].null_count, + Precision::Absent + ); + } +} diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index d1141060f9e0..c949e3c9f8cb 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -565,7 +565,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(Arc::new)) + .transpose()?, extensions: None, metadata_size_hint: None, }) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c196595eeed4..1384e6c0c32b 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -449,7 +449,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } }