Skip to content

Commit 6743003

Browse files
committed
enhance: Remove redundant statistics from FileScanConfig
Signed-off-by: Alan Tang <[email protected]>
1 parent 8356c94 commit 6743003

File tree

3 files changed

+78
-52
lines changed

3 files changed

+78
-52
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,6 @@ pub struct FileScanConfig {
148148
pub file_groups: Vec<Vec<PartitionedFile>>,
149149
/// Table constraints
150150
pub constraints: Constraints,
151-
/// Estimated overall statistics of the files, taking `filters` into account.
152-
/// Defaults to [`Statistics::new_unknown`].
153-
pub statistics: Statistics,
154151
/// Columns on which to project the data. Indexes that are higher than the
155152
/// number of columns of `file_schema` refer to `table_partition_cols`.
156153
pub projection: Option<Vec<usize>>,
@@ -330,7 +327,6 @@ impl FileScanConfig {
330327
file_schema,
331328
file_groups: vec![],
332329
constraints: Constraints::empty(),
333-
statistics,
334330
projection: None,
335331
limit: None,
336332
table_partition_cols: vec![],
@@ -339,14 +335,15 @@ impl FileScanConfig {
339335
new_lines_in_values: false,
340336
file_source: Arc::clone(&file_source),
341337
};
338+
config = config.with_statistics(statistics);
342339

343-
config = config.with_source(Arc::clone(&file_source));
344340
config
345341
}
346342

347343
/// Set the file source
348344
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
349-
self.file_source = file_source.with_statistics(self.statistics.clone());
345+
self.file_source =
346+
file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
350347
self
351348
}
352349

@@ -358,7 +355,6 @@ impl FileScanConfig {
358355

359356
/// Set the statistics of the files
360357
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
361-
self.statistics = statistics.clone();
362358
self.file_source = self.file_source.with_statistics(statistics);
363359
self
364360
}
@@ -373,10 +369,7 @@ impl FileScanConfig {
373369
}
374370

375371
fn projected_stats(&self) -> Statistics {
376-
let statistics = self
377-
.file_source
378-
.statistics()
379-
.unwrap_or(self.statistics.clone());
372+
let statistics = self.file_source.statistics().unwrap();
380373

381374
let table_cols_stats = self
382375
.projection_indices()
@@ -509,7 +502,7 @@ impl FileScanConfig {
509502
return (
510503
Arc::clone(&self.file_schema),
511504
self.constraints.clone(),
512-
self.statistics.clone(),
505+
self.file_source.statistics().unwrap().clone(),
513506
self.output_ordering.clone(),
514507
);
515508
}
@@ -651,7 +644,11 @@ impl Debug for FileScanConfig {
651644
write!(f, "FileScanConfig {{")?;
652645
write!(f, "object_store_url={:?}, ", self.object_store_url)?;
653646

654-
write!(f, "statistics={:?}, ", self.statistics)?;
647+
write!(
648+
f,
649+
"statistics={:?}, ",
650+
self.file_source.statistics().unwrap()
651+
)?;
655652

656653
DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
657654
write!(f, "}}")

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ pub fn serialize_file_scan_config(
507507

508508
Ok(protobuf::FileScanExecConf {
509509
file_groups,
510-
statistics: Some((&conf.statistics).into()),
510+
statistics: Some((&conf.file_source.statistics().unwrap()).into()),
511511
limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
512512
projection: conf
513513
.projection

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -740,20 +740,31 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
740740
let file_source = Arc::new(
741741
ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate),
742742
);
743+
let statistics = Statistics {
744+
num_rows: Precision::Inexact(100),
745+
total_byte_size: Precision::Inexact(1024),
746+
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
747+
Field::new("col", DataType::Utf8, false),
748+
]))),
749+
};
743750

744-
let scan_config =
745-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
746-
.with_file_groups(vec![vec![PartitionedFile::new(
747-
"/path/to/file.parquet".to_string(),
748-
1024,
749-
)]])
750-
.with_statistics(Statistics {
751-
num_rows: Precision::Inexact(100),
752-
total_byte_size: Precision::Inexact(1024),
753-
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(
754-
vec![Field::new("col", DataType::Utf8, false)],
755-
))),
756-
});
751+
let mut scan_config = FileScanConfig {
752+
object_store_url: ObjectStoreUrl::local_filesystem(),
753+
file_schema,
754+
file_groups: vec![vec![PartitionedFile::new(
755+
"/path/to/file.parquet".to_string(),
756+
1024,
757+
)]],
758+
constraints: Constraints::empty(),
759+
projection: None,
760+
limit: None,
761+
table_partition_cols: vec![],
762+
output_ordering: vec![],
763+
file_compression_type: FileCompressionType::UNCOMPRESSED,
764+
new_lines_in_values: false,
765+
file_source: source,
766+
};
767+
scan_config = scan_config.with_statistics(statistics);
757768

758769
roundtrip_test(scan_config.build())
759770
}
@@ -795,19 +806,32 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
795806
.with_predicate(Arc::clone(&file_schema), custom_predicate_expr),
796807
);
797808

798-
let scan_config =
799-
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
800-
.with_file_groups(vec![vec![PartitionedFile::new(
801-
"/path/to/file.parquet".to_string(),
802-
1024,
803-
)]])
804-
.with_statistics(Statistics {
805-
num_rows: Precision::Inexact(100),
806-
total_byte_size: Precision::Inexact(1024),
807-
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(
808-
vec![Field::new("col", DataType::Utf8, false)],
809-
))),
810-
});
809+
let statistics = Statistics {
810+
num_rows: Precision::Inexact(100),
811+
total_byte_size: Precision::Inexact(1024),
812+
column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![
813+
Field::new("col", DataType::Utf8, false),
814+
]))),
815+
};
816+
817+
let mut scan_config = FileScanConfig {
818+
object_store_url: ObjectStoreUrl::local_filesystem(),
819+
file_schema,
820+
file_groups: vec![vec![PartitionedFile::new(
821+
"/path/to/file.parquet".to_string(),
822+
1024,
823+
)]],
824+
constraints: Constraints::empty(),
825+
826+
projection: None,
827+
limit: None,
828+
table_partition_cols: vec![],
829+
output_ordering: vec![],
830+
file_compression_type: FileCompressionType::UNCOMPRESSED,
831+
new_lines_in_values: false,
832+
file_source: source,
833+
};
834+
scan_config = scan_config.with_statistics(statistics);
811835

812836
#[derive(Debug, Clone, Eq)]
813837
struct CustomPredicateExpr {
@@ -1587,18 +1611,23 @@ async fn roundtrip_projection_source() -> Result<()> {
15871611

15881612
let statistics = Statistics::new_unknown(&schema);
15891613

1590-
let file_source = ParquetSource::default().with_statistics(statistics.clone());
1591-
let scan_config = FileScanConfig::new(
1592-
ObjectStoreUrl::local_filesystem(),
1593-
schema.clone(),
1594-
file_source,
1595-
)
1596-
.with_file_groups(vec![vec![PartitionedFile::new(
1597-
"/path/to/file.parquet".to_string(),
1598-
1024,
1599-
)]])
1600-
.with_statistics(statistics)
1601-
.with_projection(Some(vec![0, 1, 2]));
1614+
let source = ParquetSource::default().with_statistics(statistics.clone());
1615+
let scan_config = FileScanConfig {
1616+
object_store_url: ObjectStoreUrl::local_filesystem(),
1617+
file_groups: vec![vec![PartitionedFile::new(
1618+
"/path/to/file.parquet".to_string(),
1619+
1024,
1620+
)]],
1621+
constraints: Constraints::empty(),
1622+
file_schema: schema.clone(),
1623+
projection: Some(vec![0, 1, 2]),
1624+
limit: None,
1625+
table_partition_cols: vec![],
1626+
output_ordering: vec![],
1627+
file_compression_type: FileCompressionType::UNCOMPRESSED,
1628+
new_lines_in_values: false,
1629+
file_source: source,
1630+
};
16021631

16031632
let filter = Arc::new(
16041633
FilterExec::try_new(

0 commit comments

Comments
 (0)