Skip to content

Support computing statistics for FileGroup #15432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 98 additions & 23 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@

//! The table implementation.

use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
use super::{ListingTableUrl, PartitionedFile};
use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use crate::datasource::{
create_ordering,
file_format::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
get_statistics_with_limit,
physical_plan::FileSinkConfig,
};
use crate::execution::context::SessionState;
Expand All @@ -55,9 +53,12 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::stats::Precision;
use datafusion_datasource::add_row_stats;
use datafusion_datasource::compute_all_files_statistics;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;

Expand Down Expand Up @@ -1115,32 +1116,26 @@ impl ListingTable {
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
Ok((part_file, statistics))
let statistics = if self.options.collect_stat {
self.do_collect_statistics(ctx, &store, &part_file).await?
} else {
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
Arc::new(Statistics::new_unknown(&self.file_schema))
};
Ok(part_file.with_statistics(statistics))
})
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) = get_statistics_with_limit(
files,
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
compute_all_files_statistics(
file_groups,
self.schema(),
limit,
self.options.collect_stat,
inexact_stats,
)
.await?;

Ok((
files.split_files(self.options.target_partitions),
statistics,
))
}

/// Collects statistics for a given partitioned file.
Expand Down Expand Up @@ -1182,6 +1177,86 @@ impl ListingTable {
}
}

/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
///
/// This function collects files from the provided stream until either:
/// 1. The stream is exhausted
/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
///
/// # Arguments
/// * `files` - A stream of `Result<PartitionedFile>` items to process
/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
/// once the accumulated number of rows exceeds this limit
/// * `collect_stats` - Whether to collect and accumulate statistics from the files
///
/// # Returns
/// A `Result` containing a `FileGroup` with the collected files
/// and a boolean indicating whether the statistics are inexact.
///
/// # Note
/// The function will continue processing files if statistics are not available or if the
/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
/// but files will still be collected.
async fn get_files_with_limit(
files: impl Stream<Item = Result<PartitionedFile>>,
limit: Option<usize>,
collect_stats: bool,
) -> Result<(FileGroup, bool)> {
let mut file_group = FileGroup::default();
// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(files.fuse());
let mut num_rows = Precision::<usize>::Absent;
while let Some(first_file) = all_files.next().await {
let file = first_file?;
if let Some(file_statistic) = &file.statistics {
num_rows = file_statistic.num_rows;
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
let conservative_num_rows = match num_rows {
Precision::Exact(nr) => nr,
_ => usize::MIN,
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let file = current?;
if !collect_stats {
file_group.push(file);
continue;
}

// We accumulate the number of rows, total byte size and null
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
if let Some(file_stats) = &file.statistics {
num_rows = add_row_stats(num_rows, file_stats.num_rows);
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
if num_rows.get_value().unwrap_or(&usize::MIN)
> &limit.unwrap_or(usize::MAX)
{
break;
}
}
}
}
// If we still have files in the stream, it means that the limit kicked
// in, and the statistic could have been different had we processed the
// files in a different order.
let inexact_stats = all_files.next().await.is_some();
Ok((file_group, inexact_stats))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source;
pub use datafusion_catalog::memory;
pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
pub use datafusion_datasource::get_statistics_with_limit;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::sink;
pub use datafusion_datasource::source;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ impl FileGroup {
self.files.push(file);
}

/// Get the statistics for this group
pub fn statistics(&self) -> Option<&Statistics> {
self.statistics.as_ref()
}

/// Partition the list of files into `n` groups
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
if self.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@ mod tests {
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Statistics {
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
Expand All @@ -2020,7 +2020,7 @@ mod tests {
.unwrap_or_default()
})
.collect::<Vec<_>>(),
}),
})),
extensions: None,
metadata_size_hint: None,
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use statistics::get_statistics_with_limit;
pub use statistics::add_row_stats;
pub use statistics::compute_all_files_statistics;

/// Stream of files get listed from object store
pub type PartitionedFileStream =
Expand Down Expand Up @@ -107,7 +108,7 @@ pub struct PartitionedFile {
///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Statistics>,
pub statistics: Option<Arc<Statistics>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for adding Arc

/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
/// The estimated size of the parquet metadata, in bytes
Expand Down Expand Up @@ -187,6 +188,12 @@ impl PartitionedFile {
self.extensions = Some(extensions);
self
}

// Update the statistics for this file.
pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
self.statistics = Some(statistics);
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
Loading