Skip to content

Map file-level column statistics to the table-level #15865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ impl Statistics {

/// Summarize zero or more statistics into a single `Statistics` instance.
///
/// The method assumes that all statistics are for the same schema.
/// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
///
/// Returns an error if the statistics do not match the specified schemas.
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
where
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -1129,7 +1130,17 @@ impl ListingTable {
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
let mut file_groups = file_group.split_files(self.options.target_partitions);
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the default schema mapper makes sense for now / in this PR, but in general I think it would make sense to allow the user to provide their own schema mapping rules here (so a default value that is not NULL can be used, for example) via their own mapper.

However, we woudl have to add a schema mapper factory to ListingOptions

https://github.com/apache/datafusion/blob/f1bbb1d636650c7f28f52dc507f36e64d71e1aa8/datafusion/core/src/datasource/listing/table.rs#L256-L255

(this is not a change needed for this PR, I just noticed it while reviewing this PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I filed an issue to track: #15889

Comment on lines +1133 to +1134
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb While I was working on #15852, I found in fact, for listing table, doesn't have the issue described in #15689, that is, all files here have the same schema because when creating table, all fetched files already use the SchemaMapper to reorder their schema, see here: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/opener.rs#L206.

What we should fix is let the file schema match the listing table schema, usually, if users specify the partition col, table schema will have the extra partition col infos, so I moved the mapper down the compute_all_files_statistics method in the commit: 689fc66.

.map_schema(self.file_schema.as_ref())?;
// Use schema_mapper to map each file-level column statistics to table-level column statistics
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
compute_all_files_statistics(
file_groups,
self.schema(),
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,5 +264,12 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_column_statistics(
&self,
_file_col_statistics: &[datafusion_common::ColumnStatistics],
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
unimplemented!()
}
}
}
5 changes: 5 additions & 0 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ impl FileGroup {
self.statistics.as_deref()
}

/// Get the mutable reference to the statistics for this group
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
self.statistics.as_mut().map(Arc::make_mut)
}

/// Partition the list of files into `n` groups
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
if self.is_empty() {
Expand Down
Loading