Skip to content

Commit 54302ac

Browse files
authored
Map file-level column statistics to the table-level (#15865)
* init * fix clippy * add test
1 parent 5e1214c commit 54302ac

File tree

6 files changed

+203
-2
lines changed

6 files changed

+203
-2
lines changed

datafusion/common/src/stats.rs

+3
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,9 @@ impl Statistics {
451451

452452
/// Summarize zero or more statistics into a single `Statistics` instance.
453453
///
454+
/// The method assumes that all statistics are for the same schema.
455+
/// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
456+
///
454457
/// Returns an error if the statistics do not match the specified schemas.
455458
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
456459
where

datafusion/core/src/datasource/listing/table.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::execution::context::SessionState;
3333
use datafusion_catalog::TableProvider;
3434
use datafusion_common::{config_err, DataFusionError, Result};
3535
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
36+
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
3637
use datafusion_expr::dml::InsertOp;
3738
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
3839
use datafusion_expr::{SortExpr, TableType};
@@ -1148,7 +1149,17 @@ impl ListingTable {
11481149
let (file_group, inexact_stats) =
11491150
get_files_with_limit(files, limit, self.options.collect_stat).await?;
11501151

1151-
let file_groups = file_group.split_files(self.options.target_partitions);
1152+
let mut file_groups = file_group.split_files(self.options.target_partitions);
1153+
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
1154+
.map_schema(self.file_schema.as_ref())?;
1155+
// Use schema_mapper to map each file-level column statistics to table-level column statistics
1156+
file_groups.iter_mut().try_for_each(|file_group| {
1157+
if let Some(stat) = file_group.statistics_mut() {
1158+
stat.column_statistics =
1159+
schema_mapper.map_column_statistics(&stat.column_statistics)?;
1160+
}
1161+
Ok::<_, DataFusionError>(())
1162+
})?;
11521163
compute_all_files_statistics(
11531164
file_groups,
11541165
self.schema(),

datafusion/core/src/datasource/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -264,5 +264,12 @@ mod tests {
264264

265265
Ok(RecordBatch::try_new(schema, new_columns).unwrap())
266266
}
267+
268+
fn map_column_statistics(
269+
&self,
270+
_file_col_statistics: &[datafusion_common::ColumnStatistics],
271+
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
272+
unimplemented!()
273+
}
267274
}
268275
}

datafusion/datasource/src/file_groups.rs

+5
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,11 @@ impl FileGroup {
425425
self.statistics.as_deref()
426426
}
427427

428+
/// Get the mutable reference to the statistics for this group
429+
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
430+
self.statistics.as_mut().map(Arc::make_mut)
431+
}
432+
428433
/// Partition the list of files into `n` groups
429434
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
430435
if self.is_empty() {

datafusion/datasource/src/schema_adapter.rs

+129-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
2525
use arrow::compute::{can_cast_types, cast};
2626
use arrow::datatypes::{Schema, SchemaRef};
27-
use datafusion_common::plan_err;
27+
use datafusion_common::{plan_err, ColumnStatistics};
2828
use std::fmt::Debug;
2929
use std::sync::Arc;
3030

@@ -96,6 +96,12 @@ pub trait SchemaAdapter: Send + Sync {
9696
pub trait SchemaMapper: Debug + Send + Sync {
9797
/// Adapts a `RecordBatch` to match the `table_schema`
9898
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99+
100+
/// Adapts file-level column `Statistics` to match the `table_schema`
101+
fn map_column_statistics(
102+
&self,
103+
file_col_statistics: &[ColumnStatistics],
104+
) -> datafusion_common::Result<Vec<ColumnStatistics>>;
99105
}
100106

101107
/// Default [`SchemaAdapterFactory`] for mapping schemas.
@@ -334,4 +340,126 @@ impl SchemaMapper for SchemaMapping {
334340
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
335341
Ok(record_batch)
336342
}
343+
344+
/// Adapts file-level column `Statistics` to match the `table_schema`
345+
fn map_column_statistics(
346+
&self,
347+
file_col_statistics: &[ColumnStatistics],
348+
) -> datafusion_common::Result<Vec<ColumnStatistics>> {
349+
let mut table_col_statistics = vec![];
350+
351+
// Map the statistics for each field in the file schema to the corresponding field in the
352+
// table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
353+
for (_, file_col_idx) in self
354+
.projected_table_schema
355+
.fields()
356+
.iter()
357+
.zip(&self.field_mappings)
358+
{
359+
if let Some(file_col_idx) = file_col_idx {
360+
table_col_statistics.push(
361+
file_col_statistics
362+
.get(*file_col_idx)
363+
.cloned()
364+
.unwrap_or_default(),
365+
);
366+
} else {
367+
table_col_statistics.push(ColumnStatistics::new_unknown());
368+
}
369+
}
370+
371+
Ok(table_col_statistics)
372+
}
373+
}
374+
375+
#[cfg(test)]
376+
mod tests {
377+
use arrow::datatypes::{DataType, Field};
378+
use datafusion_common::{stats::Precision, Statistics};
379+
380+
use super::*;
381+
382+
#[test]
383+
fn test_schema_mapping_map_statistics_basic() {
384+
// Create table schema (a, b, c)
385+
let table_schema = Arc::new(Schema::new(vec![
386+
Field::new("a", DataType::Int32, true),
387+
Field::new("b", DataType::Utf8, true),
388+
Field::new("c", DataType::Float64, true),
389+
]));
390+
391+
// Create file schema (b, a) - different order, missing c
392+
let file_schema = Schema::new(vec![
393+
Field::new("b", DataType::Utf8, true),
394+
Field::new("a", DataType::Int32, true),
395+
]);
396+
397+
// Create SchemaAdapter
398+
let adapter = DefaultSchemaAdapter {
399+
projected_table_schema: Arc::clone(&table_schema),
400+
};
401+
402+
// Get mapper and projection
403+
let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
404+
405+
// Should project columns 0,1 from file
406+
assert_eq!(projection, vec![0, 1]);
407+
408+
// Create file statistics
409+
let mut file_stats = Statistics::default();
410+
411+
// Statistics for column b (index 0 in file)
412+
let b_stats = ColumnStatistics {
413+
null_count: Precision::Exact(5),
414+
..Default::default()
415+
};
416+
417+
// Statistics for column a (index 1 in file)
418+
let a_stats = ColumnStatistics {
419+
null_count: Precision::Exact(10),
420+
..Default::default()
421+
};
422+
423+
file_stats.column_statistics = vec![b_stats, a_stats];
424+
425+
// Map statistics
426+
let table_col_stats = mapper
427+
.map_column_statistics(&file_stats.column_statistics)
428+
.unwrap();
429+
430+
// Verify stats
431+
assert_eq!(table_col_stats.len(), 3);
432+
assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
433+
assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
434+
assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
435+
}
436+
437+
#[test]
438+
fn test_schema_mapping_map_statistics_empty() {
439+
// Create schemas
440+
let table_schema = Arc::new(Schema::new(vec![
441+
Field::new("a", DataType::Int32, true),
442+
Field::new("b", DataType::Utf8, true),
443+
]));
444+
let file_schema = Schema::new(vec![
445+
Field::new("a", DataType::Int32, true),
446+
Field::new("b", DataType::Utf8, true),
447+
]);
448+
449+
let adapter = DefaultSchemaAdapter {
450+
projected_table_schema: Arc::clone(&table_schema),
451+
};
452+
let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
453+
454+
// Empty file statistics
455+
let file_stats = Statistics::default();
456+
let table_col_stats = mapper
457+
.map_column_statistics(&file_stats.column_statistics)
458+
.unwrap();
459+
460+
// All stats should be unknown
461+
assert_eq!(table_col_stats.len(), 2);
462+
assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
463+
assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
464+
}
337465
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Test file with different schema order but genenrating correct statistics for table
19+
statement ok
20+
COPY (SELECT * FROM values (1, 'a'), (2, 'b') t(int_col, str_col)) to '/tmp/table/1.parquet';
21+
22+
statement ok
23+
COPY (SELECT * FROM values ('c', 3), ('d', -1) t(str_col, int_col)) to '/tmp/table/2.parquet';
24+
25+
statement ok
26+
set datafusion.execution.collect_statistics = true;
27+
28+
statement ok
29+
set datafusion.explain.show_statistics = true;
30+
31+
statement ok
32+
create external table t stored as parquet location '/tmp/table';
33+
34+
query TT
35+
explain format indent select * from t;
36+
----
37+
logical_plan TableScan: t projection=[int_col, str_col]
38+
physical_plan DataSourceExec: file_groups={2 groups: [[tmp/table/1.parquet], [tmp/table/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(288), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]]
39+
40+
statement ok
41+
drop table t;
42+
43+
statement ok
44+
set datafusion.execution.collect_statistics = false;
45+
46+
statement ok
47+
set datafusion.explain.show_statistics = false;

0 commit comments

Comments
 (0)