From f1bbb1d636650c7f28f52dc507f36e64d71e1aa8 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Sat, 26 Apr 2025 00:08:13 +0800 Subject: [PATCH 1/3] init --- .../core/src/datasource/listing/table.rs | 13 +- datafusion/core/src/datasource/mod.rs | 7 + datafusion/datasource/src/file_groups.rs | 5 + datafusion/datasource/src/schema_adapter.rs | 126 +++++++++++++++++- 4 files changed, 149 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9834da92e5a..064ebb520e38 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -33,6 +33,7 @@ use crate::execution::context::SessionState; use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_expr::dml::InsertOp; use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -1129,7 +1130,17 @@ impl ListingTable { let (file_group, inexact_stats) = get_files_with_limit(files, limit, self.options.collect_stat).await?; - let file_groups = file_group.split_files(self.options.target_partitions); + let mut file_groups = file_group.split_files(self.options.target_partitions); + let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema()) + .map_schema(self.file_schema.as_ref())?; + // Use schema_mapper to map each file-level column statistics to table-level column statistics + file_groups.iter_mut().try_for_each(|file_group| { + if let Some(stat) = file_group.statistics_mut() { + stat.column_statistics = + schema_mapper.map_column_statistics(&stat.column_statistics)?; + } + Ok::<_, DataFusionError>(()) + })?; compute_all_files_statistics( file_groups, self.schema(), diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 25a89644cd2a..818948c8e0eb 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -264,5 +264,12 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } + + fn map_column_statistics( + &self, + _file_col_statistics: &[datafusion_common::ColumnStatistics], + ) -> datafusion_common::Result> { + todo!() + } } } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 15c86427ed00..5f1def86591b 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -425,6 +425,11 @@ impl FileGroup { self.statistics.as_deref() } + /// Get the mutable reference to the statistics for this group + pub fn statistics_mut(&mut self) -> Option<&mut Statistics> { + self.statistics.as_mut().map(|arc| Arc::make_mut(arc)) + } + /// Partition the list of files into `n` groups pub fn split_files(mut self, n: usize) -> Vec { if self.is_empty() { diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index eafddecd05f5..089c3f0c59d5 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -24,7 +24,7 @@ use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::plan_err; +use datafusion_common::{plan_err, ColumnStatistics}; use std::fmt::Debug; use std::sync::Arc; @@ -96,6 +96,12 @@ pub trait SchemaAdapter: Send + Sync { pub trait SchemaMapper: Debug + Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; + + /// Adapts file-level column `Statistics` to match the `table_schema` + fn map_column_statistics( + &self, + file_col_statistics: &[ColumnStatistics], + ) -> datafusion_common::Result>; } /// Default [`SchemaAdapterFactory`] for mapping schemas. @@ -334,4 +340,122 @@ impl SchemaMapper for SchemaMapping { let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } + + /// Adapts file-level column `Statistics` to match the `table_schema` + fn map_column_statistics( + &self, + file_col_statistics: &[ColumnStatistics], + ) -> datafusion_common::Result> { + let mut table_col_statistics = vec![]; + + // Map the statistics for each field in the file schema to the corresponding field in the + // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown` + for (_, file_col_idx) in self + .projected_table_schema + .fields() + .iter() + .zip(&self.field_mappings) + { + if let Some(file_col_idx) = file_col_idx { + table_col_statistics.push( + file_col_statistics + .get(*file_col_idx) + .cloned() + .unwrap_or_default(), + ); + } else { + table_col_statistics.push(ColumnStatistics::new_unknown()); + } + } + + Ok(table_col_statistics) + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::{DataType, Field}; + use datafusion_common::{stats::Precision, Statistics}; + + use super::*; + + #[test] + fn test_schema_mapping_map_statistics_basic() { + // Create table schema (a, b, c) + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ])); + + // Create file schema (b, a) - different order, missing c + let file_schema = Schema::new(vec![ + Field::new("b", DataType::Utf8, true), + Field::new("a", DataType::Int32, true), + ]); + + // Create SchemaAdapter + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + + // Get mapper and projection + let (mapper, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Should project columns 0,1 from file + assert_eq!(projection, vec![0, 1]); + + // Create file statistics + let mut file_stats = Statistics::default(); + + // Statistics for column b (index 0 in file) + let mut b_stats = ColumnStatistics::default(); + b_stats.null_count = Precision::Exact(5); + + // Statistics for column a (index 1 in file) + let mut a_stats = ColumnStatistics::default(); + a_stats.null_count = Precision::Exact(10); + + file_stats.column_statistics = vec![b_stats, a_stats]; + + // Map statistics + let table_col_stats = mapper + .map_column_statistics(&file_stats.column_statistics) + .unwrap(); + + // Verify stats + assert_eq!(table_col_stats.len(), 3); + assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1 + assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0 + assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown) + } + + #[test] + fn test_schema_mapping_map_statistics_empty() { + // Create schemas + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ])); + let file_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]); + + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + let (mapper, _) = adapter.map_schema(&file_schema).unwrap(); + + // Empty file statistics + let file_stats = Statistics::default(); + let table_col_stats = mapper + .map_column_statistics(&file_stats.column_statistics) + .unwrap(); + + // All stats should be unknown + assert_eq!(table_col_stats.len(), 2); + assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),); + assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),); + } } From df1db6afa8083b15f516c4ecde550fe855076c94 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Sun, 27 Apr 2025 17:51:20 +0800 Subject: [PATCH 2/3] fix clippy --- datafusion/common/src/stats.rs | 3 +++ datafusion/core/src/datasource/mod.rs | 2 +- datafusion/datasource/src/file_groups.rs | 2 +- datafusion/datasource/src/schema_adapter.rs | 12 ++++++++---- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 807d885b3a4d..82e9f23a7278 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -451,6 +451,9 @@ impl Statistics { /// Summarize zero or more statistics into a single `Statistics` instance. /// + /// The method assumes that all statistics are for the same schema. + /// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent. + /// /// Returns an error if the statistics do not match the specified schemas. pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result where diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 818948c8e0eb..674541ff73a5 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -269,7 +269,7 @@ mod tests { &self, _file_col_statistics: &[datafusion_common::ColumnStatistics], ) -> datafusion_common::Result> { - todo!() + unimplemented!() } } } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 5f1def86591b..929787e436c8 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -427,7 +427,7 @@ impl FileGroup { /// Get the mutable reference to the statistics for this group pub fn statistics_mut(&mut self) -> Option<&mut Statistics> { - self.statistics.as_mut().map(|arc| Arc::make_mut(arc)) + self.statistics.as_mut().map(Arc::make_mut) } /// Partition the list of files into `n` groups diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 089c3f0c59d5..bacec7f4f9f0 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -409,12 +409,16 @@ mod tests { let mut file_stats = Statistics::default(); // Statistics for column b (index 0 in file) - let mut b_stats = ColumnStatistics::default(); - b_stats.null_count = Precision::Exact(5); + let b_stats = ColumnStatistics { + null_count: Precision::Exact(5), + ..Default::default() + }; // Statistics for column a (index 1 in file) - let mut a_stats = ColumnStatistics::default(); - a_stats.null_count = Precision::Exact(10); + let a_stats = ColumnStatistics { + null_count: Precision::Exact(10), + ..Default::default() + }; file_stats.column_statistics = vec![b_stats, a_stats]; From ab25831aaf67b4f1c41a4db4b8ac492b5cf8f908 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 29 Apr 2025 14:39:05 +0800 Subject: [PATCH 3/3] add test --- Cargo.lock | 61 +++++++++---------- .../test_files/listing_table_statistics.slt | 47 ++++++++++++++ 2 files changed, 75 insertions(+), 33 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/listing_table_statistics.slt diff --git a/Cargo.lock b/Cargo.lock index 07efb0a96d39..3ef84254b8df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -610,9 +610,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.6.1" +version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c39646d1a6b51240a1a23bb57ea4eebede7e16fbc237fdc876980233dcecb4f" +checksum = "b6fcc63c9860579e4cb396239570e979376e70aab79e496621748a09913f8b36" dependencies = [ "aws-credential-types", "aws-runtime", @@ -640,9 +640,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.2" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4471bef4c22a06d2c7a1b6492493d3fdf24a805323109d6874f9c94d5906ac14" +checksum = "687bc16bc431a8533fe0097c7f0182874767f920989d7260950172ae8e3c4465" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -675,9 +675,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.6" +version = "1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aff45ffe35196e593ea3b9dd65b320e51e2dda95aff4390bc459e461d09c6ad" +checksum = "6c4063282c69991e57faab9e5cb21ae557e59f5b0fb285c196335243df8dc25c" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -691,7 +691,6 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", - "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -700,9 +699,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.63.0" +version = "1.65.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1cb45b83b53b5cd55ee33fd9fd8a70750255a3f286e4dca20e882052f2b256f" +checksum = "8efec445fb78df585327094fcef4cad895b154b58711e504db7a93c41aa27151" dependencies = [ "aws-credential-types", "aws-runtime", @@ -723,9 +722,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.64.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8d4d9bc075ea6238778ed3951b65d3cde8c3864282d64fdcd19f2a90c0609f1" +checksum = "5e49cca619c10e7b002dc8e66928ceed66ab7f56c1a3be86c5437bf2d8d89bba" dependencies = [ "aws-credential-types", "aws-runtime", @@ -746,9 +745,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.64.0" +version = "1.66.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819ccba087f403890fee4825eeab460e64c59345667d2b83a12cf544b581e3a7" +checksum = "7420479eac0a53f776cc8f0d493841ffe58ad9d9783f3947be7265784471b47a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -770,9 +769,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d03c3c05ff80d54ff860fe38c726f6f494c639ae975203a101335f223386db" +checksum = "3503af839bd8751d0bdc5a46b9cac93a003a353e635b0c12cf2376b5b53e41ea" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -784,7 +783,6 @@ dependencies = [ "hmac", "http 0.2.12", "http 1.2.0", - "once_cell", "percent-encoding", "sha2", "time", @@ -804,9 +802,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.0" +version = "0.62.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5949124d11e538ca21142d1fba61ab0a2a2c1bc3ed323cdb3e4b878bfb83166" +checksum = "99335bec6cdc50a346fda1437f9fefe33abf8c99060739a546a16457f2862ca9" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -816,7 +814,6 @@ dependencies = [ "http 0.2.12", "http 1.2.0", "http-body 0.4.6", - "once_cell", "percent-encoding", "pin-project-lite", "pin-utils", @@ -825,9 +822,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0497ef5d53065b7cd6a35e9c1654bd1fefeae5c52900d91d1b188b0af0f29324" +checksum = "8aff1159006441d02e57204bf57a1b890ba68bedb6904ffd2873c1c4c11c546b" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -857,12 +854,11 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445d065e76bc1ef54963db400319f1dd3ebb3e0a74af20f7f7630625b0cc7cc0" +checksum = "9364d5989ac4dd918e5cc4c4bdcc61c9be17dcd2586ea7f69e348fc7c6cab393" dependencies = [ "aws-smithy-runtime-api", - "once_cell", ] [[package]] @@ -877,9 +873,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.1" +version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0152749e17ce4d1b47c7747bdfec09dac1ccafdcbc741ebf9daa2a373356730f" +checksum = "14302f06d1d5b7d333fd819943075b13d27c7700b414f574c3c35859bfb55d5e" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -893,7 +889,6 @@ dependencies = [ "http 1.2.0", "http-body 0.4.6", "http-body 1.0.1", - "once_cell", "pin-project-lite", "pin-utils", "tokio", @@ -902,9 +897,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.4" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3da37cf5d57011cb1753456518ec76e31691f1f474b73934a284eb2a1c76510f" +checksum = "a1e5d9e3a80a18afa109391fb5ad09c3daf887b516c6fd805a157c6ea7994a57" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -919,9 +914,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836155caafba616c0ff9b07944324785de2ab016141c3550bd1c07882f8cee8f" +checksum = "40076bd09fadbc12d5e026ae080d0930defa606856186e31d83ccc6a255eeaf3" dependencies = [ "base64-simd", "bytes", @@ -951,9 +946,9 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.6" +version = "1.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3873f8deed8927ce8d04487630dc9ff73193bab64742a61d050e57a68dec4125" +checksum = "8a322fec39e4df22777ed3ad8ea868ac2f94cd15e1a55f6ee8d8d6305057689a" dependencies = [ "aws-credential-types", "aws-smithy-async", diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt new file mode 100644 index 000000000000..aeeaaea6c2a3 --- /dev/null +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test file with different schema order but genenrating correct statistics for table +statement ok +COPY (SELECT * FROM values (1, 'a'), (2, 'b') t(int_col, str_col)) to '/tmp/table/1.parquet'; + +statement ok +COPY (SELECT * FROM values ('c', 3), ('d', -1) t(str_col, int_col)) to '/tmp/table/2.parquet'; + +statement ok +set datafusion.execution.collect_statistics = true; + +statement ok +set datafusion.explain.show_statistics = true; + +statement ok +create external table t stored as parquet location '/tmp/table'; + +query TT +explain format indent select * from t; +---- +logical_plan TableScan: t projection=[int_col, str_col] +physical_plan DataSourceExec: file_groups={2 groups: [[tmp/table/1.parquet], [tmp/table/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(288), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] + +statement ok +drop table t; + +statement ok +set datafusion.execution.collect_statistics = false; + +statement ok +set datafusion.explain.show_statistics = false; \ No newline at end of file