Skip to content

Commit 073ecf4

Browse files
xudong963Nirnay Roy
authored and
Nirnay Roy
committed
Support computing statistics for FileGroup (apache#15432)
* Support computing statistics for FileGroup * avoid clone * add tests * fix conflicts
1 parent 0775dcf commit 073ecf4

File tree

8 files changed

+443
-33
lines changed

8 files changed

+443
-33
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 98 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,16 @@
1717

1818
//! The table implementation.
1919
20-
use std::collections::HashMap;
21-
use std::{any::Any, str::FromStr, sync::Arc};
22-
2320
use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
2421
use super::{ListingTableUrl, PartitionedFile};
22+
use std::collections::HashMap;
23+
use std::{any::Any, str::FromStr, sync::Arc};
2524

2625
use crate::datasource::{
2726
create_ordering,
2827
file_format::{
2928
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
3029
},
31-
get_statistics_with_limit,
3230
physical_plan::FileSinkConfig,
3331
};
3432
use crate::execution::context::SessionState;
@@ -55,9 +53,12 @@ use datafusion_physical_expr::{
5553

5654
use async_trait::async_trait;
5755
use datafusion_catalog::Session;
56+
use datafusion_common::stats::Precision;
57+
use datafusion_datasource::add_row_stats;
58+
use datafusion_datasource::compute_all_files_statistics;
5859
use datafusion_datasource::file_groups::FileGroup;
5960
use datafusion_physical_expr_common::sort_expr::LexRequirement;
60-
use futures::{future, stream, StreamExt, TryStreamExt};
61+
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
6162
use itertools::Itertools;
6263
use object_store::ObjectStore;
6364

@@ -1115,32 +1116,26 @@ impl ListingTable {
11151116
let files = file_list
11161117
.map(|part_file| async {
11171118
let part_file = part_file?;
1118-
if self.options.collect_stat {
1119-
let statistics =
1120-
self.do_collect_statistics(ctx, &store, &part_file).await?;
1121-
Ok((part_file, statistics))
1119+
let statistics = if self.options.collect_stat {
1120+
self.do_collect_statistics(ctx, &store, &part_file).await?
11221121
} else {
1123-
Ok((
1124-
part_file,
1125-
Arc::new(Statistics::new_unknown(&self.file_schema)),
1126-
))
1127-
}
1122+
Arc::new(Statistics::new_unknown(&self.file_schema))
1123+
};
1124+
Ok(part_file.with_statistics(statistics))
11281125
})
11291126
.boxed()
11301127
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11311128

1132-
let (files, statistics) = get_statistics_with_limit(
1133-
files,
1129+
let (file_group, inexact_stats) =
1130+
get_files_with_limit(files, limit, self.options.collect_stat).await?;
1131+
1132+
let file_groups = file_group.split_files(self.options.target_partitions);
1133+
compute_all_files_statistics(
1134+
file_groups,
11341135
self.schema(),
1135-
limit,
11361136
self.options.collect_stat,
1137+
inexact_stats,
11371138
)
1138-
.await?;
1139-
1140-
Ok((
1141-
files.split_files(self.options.target_partitions),
1142-
statistics,
1143-
))
11441139
}
11451140

11461141
/// Collects statistics for a given partitioned file.
@@ -1182,6 +1177,86 @@ impl ListingTable {
11821177
}
11831178
}
11841179

1180+
/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
1181+
///
1182+
/// This function collects files from the provided stream until either:
1183+
/// 1. The stream is exhausted
1184+
/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
1185+
///
1186+
/// # Arguments
1187+
/// * `files` - A stream of `Result<PartitionedFile>` items to process
1188+
/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
1189+
/// once the accumulated number of rows exceeds this limit
1190+
/// * `collect_stats` - Whether to collect and accumulate statistics from the files
1191+
///
1192+
/// # Returns
1193+
/// A `Result` containing a `FileGroup` with the collected files
1194+
/// and a boolean indicating whether the statistics are inexact.
1195+
///
1196+
/// # Note
1197+
/// The function will continue processing files if statistics are not available or if the
1198+
/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
1199+
/// but files will still be collected.
1200+
async fn get_files_with_limit(
1201+
files: impl Stream<Item = Result<PartitionedFile>>,
1202+
limit: Option<usize>,
1203+
collect_stats: bool,
1204+
) -> Result<(FileGroup, bool)> {
1205+
let mut file_group = FileGroup::default();
1206+
// Fusing the stream allows us to call next safely even once it is finished.
1207+
let mut all_files = Box::pin(files.fuse());
1208+
let mut num_rows = Precision::<usize>::Absent;
1209+
while let Some(first_file) = all_files.next().await {
1210+
let file = first_file?;
1211+
if let Some(file_statistic) = &file.statistics {
1212+
num_rows = file_statistic.num_rows;
1213+
}
1214+
file_group.push(file);
1215+
1216+
// If the number of rows exceeds the limit, we can stop processing
1217+
// files. This only applies when we know the number of rows. It also
1218+
// currently ignores tables that have no statistics regarding the
1219+
// number of rows.
1220+
let conservative_num_rows = match num_rows {
1221+
Precision::Exact(nr) => nr,
1222+
_ => usize::MIN,
1223+
};
1224+
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
1225+
while let Some(current) = all_files.next().await {
1226+
let file = current?;
1227+
if !collect_stats {
1228+
file_group.push(file);
1229+
continue;
1230+
}
1231+
1232+
// We accumulate the number of rows, total byte size and null
1233+
// counts across all the files in question. If any file does not
1234+
// provide any information or provides an inexact value, we demote
1235+
// the statistic precision to inexact.
1236+
if let Some(file_stats) = &file.statistics {
1237+
num_rows = add_row_stats(num_rows, file_stats.num_rows);
1238+
}
1239+
file_group.push(file);
1240+
1241+
// If the number of rows exceeds the limit, we can stop processing
1242+
// files. This only applies when we know the number of rows. It also
1243+
// currently ignores tables that have no statistics regarding the
1244+
// number of rows.
1245+
if num_rows.get_value().unwrap_or(&usize::MIN)
1246+
> &limit.unwrap_or(usize::MAX)
1247+
{
1248+
break;
1249+
}
1250+
}
1251+
}
1252+
}
1253+
// If we still have files in the stream, it means that the limit kicked
1254+
// in, and the statistic could have been different had we processed the
1255+
// files in a different order.
1256+
let inexact_stats = all_files.next().await.is_some();
1257+
Ok((file_group, inexact_stats))
1258+
}
1259+
11851260
#[cfg(test)]
11861261
mod tests {
11871262
use super::*;

datafusion/core/src/datasource/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ pub use datafusion_catalog::default_table_source;
4242
pub use datafusion_catalog::memory;
4343
pub use datafusion_catalog::stream;
4444
pub use datafusion_catalog::view;
45-
pub use datafusion_datasource::get_statistics_with_limit;
4645
pub use datafusion_datasource::schema_adapter;
4746
pub use datafusion_datasource::sink;
4847
pub use datafusion_datasource::source;

datafusion/datasource/src/file_groups.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,11 @@ impl FileGroup {
418418
self.files.push(file);
419419
}
420420

421+
/// Get the statistics for this group
422+
pub fn statistics(&self) -> Option<&Statistics> {
423+
self.statistics.as_ref()
424+
}
425+
421426
/// Partition the list of files into `n` groups
422427
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
423428
if self.is_empty() {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,7 +2000,7 @@ mod tests {
20002000
},
20012001
partition_values: vec![ScalarValue::from(file.date)],
20022002
range: None,
2003-
statistics: Some(Statistics {
2003+
statistics: Some(Arc::new(Statistics {
20042004
num_rows: Precision::Absent,
20052005
total_byte_size: Precision::Absent,
20062006
column_statistics: file
@@ -2020,7 +2020,7 @@ mod tests {
20202020
.unwrap_or_default()
20212021
})
20222022
.collect::<Vec<_>>(),
2023-
}),
2023+
})),
20242024
extensions: None,
20252025
metadata_size_hint: None,
20262026
}

datafusion/datasource/src/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ use std::pin::Pin;
6060
use std::sync::Arc;
6161

6262
pub use self::url::ListingTableUrl;
63-
pub use statistics::get_statistics_with_limit;
63+
pub use statistics::add_row_stats;
64+
pub use statistics::compute_all_files_statistics;
6465

6566
/// Stream of files get listed from object store
6667
pub type PartitionedFileStream =
@@ -107,7 +108,7 @@ pub struct PartitionedFile {
107108
///
108109
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
109110
/// so if they are incorrect, incorrect answers may result.
110-
pub statistics: Option<Statistics>,
111+
pub statistics: Option<Arc<Statistics>>,
111112
/// An optional field for user defined per object metadata
112113
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
113114
/// The estimated size of the parquet metadata, in bytes
@@ -187,6 +188,12 @@ impl PartitionedFile {
187188
self.extensions = Some(extensions);
188189
self
189190
}
191+
192+
// Update the statistics for this file.
193+
pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
194+
self.statistics = Some(statistics);
195+
self
196+
}
190197
}
191198

192199
impl From<ObjectMeta> for PartitionedFile {

0 commit comments

Comments
 (0)