Skip to content

Commit 28a2820

Browse files
committed
save
1 parent 4d18715 commit 28a2820

File tree

9 files changed

+219
-156
lines changed

9 files changed

+219
-156
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ use datafusion_physical_expr::{
5454
use async_trait::async_trait;
5555
use datafusion_catalog::Session;
5656
use datafusion_common::stats::Precision;
57-
use datafusion_datasource::add_row_stats;
5857
use datafusion_datasource::compute_all_files_statistics;
5958
use datafusion_datasource::file_groups::FileGroup;
6059
use datafusion_physical_expr_common::sort_expr::LexRequirement;
60+
use datafusion_physical_plan::statistics::add_row_stats;
6161
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
6262
use itertools::Itertools;
6363
use object_store::ObjectStore;

datafusion/datasource/src/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ use std::pin::Pin;
6060
use std::sync::Arc;
6161

6262
pub use self::url::ListingTableUrl;
63-
pub use statistics::add_row_stats;
6463
pub use statistics::compute_all_files_statistics;
6564

6665
/// Stream of files get listed from object store

datafusion/datasource/src/statistics.rs

Lines changed: 22 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
//! respect to the required sort order. See [`MinMaxStatistics`]
2222
2323
use futures::{Stream, StreamExt};
24-
use std::mem;
2524
use std::sync::Arc;
2625

2726
use crate::file_groups::FileGroup;
@@ -34,10 +33,12 @@ use arrow::{
3433
row::{Row, Rows},
3534
};
3635
use datafusion_common::stats::Precision;
37-
use datafusion_common::ScalarValue;
3836
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
3937
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
4038
use datafusion_physical_expr_common::sort_expr::LexOrdering;
39+
use datafusion_physical_plan::statistics::{
40+
add_row_stats, compute_summary_statistics, set_max_if_greater, set_min_if_lesser,
41+
};
4142
use datafusion_physical_plan::{ColumnStatistics, Statistics};
4243

4344
/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
@@ -409,62 +410,6 @@ pub async fn get_statistics_with_limit(
409410
Ok((result_files, statistics))
410411
}
411412

412-
/// Generic function to compute statistics across multiple items that have statistics
413-
fn compute_summary_statistics<T, I>(
414-
items: I,
415-
file_schema: &SchemaRef,
416-
stats_extractor: impl Fn(&T) -> Option<&Statistics>,
417-
) -> Statistics
418-
where
419-
I: IntoIterator<Item = T>,
420-
{
421-
let size = file_schema.fields().len();
422-
let mut col_stats_set = vec![ColumnStatistics::default(); size];
423-
let mut num_rows = Precision::<usize>::Absent;
424-
let mut total_byte_size = Precision::<usize>::Absent;
425-
426-
for (idx, item) in items.into_iter().enumerate() {
427-
if let Some(item_stats) = stats_extractor(&item) {
428-
if idx == 0 {
429-
// First item, set values directly
430-
num_rows = item_stats.num_rows;
431-
total_byte_size = item_stats.total_byte_size;
432-
for (index, column_stats) in
433-
item_stats.column_statistics.iter().enumerate()
434-
{
435-
col_stats_set[index].null_count = column_stats.null_count;
436-
col_stats_set[index].max_value = column_stats.max_value.clone();
437-
col_stats_set[index].min_value = column_stats.min_value.clone();
438-
col_stats_set[index].sum_value = column_stats.sum_value.clone();
439-
}
440-
continue;
441-
}
442-
443-
// Accumulate statistics for subsequent items
444-
num_rows = add_row_stats(item_stats.num_rows, num_rows);
445-
total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size);
446-
447-
for (item_col_stats, col_stats) in item_stats
448-
.column_statistics
449-
.iter()
450-
.zip(col_stats_set.iter_mut())
451-
{
452-
col_stats.null_count =
453-
add_row_stats(item_col_stats.null_count, col_stats.null_count);
454-
set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value);
455-
set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value);
456-
col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value);
457-
}
458-
}
459-
}
460-
461-
Statistics {
462-
num_rows,
463-
total_byte_size,
464-
column_statistics: col_stats_set,
465-
}
466-
}
467-
468413
/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
469414
///
470415
/// This function combines statistics from all files in the file group to create
@@ -489,10 +434,11 @@ pub fn compute_file_group_statistics(
489434
return Ok(file_group);
490435
}
491436

492-
let statistics =
493-
compute_summary_statistics(file_group.iter(), &file_schema, |file| {
494-
file.statistics.as_ref().map(|stats| stats.as_ref())
495-
});
437+
let statistics = compute_summary_statistics(
438+
file_group.iter(),
439+
file_schema.fields().len(),
440+
|file| file.statistics.as_ref().map(|stats| stats.as_ref()),
441+
);
496442

497443
Ok(file_group.with_statistics(statistics))
498444
}
@@ -532,10 +478,11 @@ pub fn compute_all_files_statistics(
532478
}
533479

534480
// Then summary statistics across all file groups
535-
let mut statistics =
536-
compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| {
537-
file_group.statistics()
538-
});
481+
let mut statistics = compute_summary_statistics(
482+
&file_groups_with_stats,
483+
file_schema.fields().len(),
484+
|file_group| file_group.statistics(),
485+
);
539486

540487
if inexact_stats {
541488
statistics = statistics.to_inexact()
@@ -544,79 +491,6 @@ pub fn compute_all_files_statistics(
544491
Ok((file_groups_with_stats, statistics))
545492
}
546493

547-
pub fn add_row_stats(
548-
file_num_rows: Precision<usize>,
549-
num_rows: Precision<usize>,
550-
) -> Precision<usize> {
551-
match (file_num_rows, &num_rows) {
552-
(Precision::Absent, _) => num_rows.to_inexact(),
553-
(lhs, Precision::Absent) => lhs.to_inexact(),
554-
(lhs, rhs) => lhs.add(rhs),
555-
}
556-
}
557-
558-
/// If the given value is numerically greater than the original maximum value,
559-
/// return the new maximum value with appropriate exactness information.
560-
fn set_max_if_greater(
561-
max_nominee: &Precision<ScalarValue>,
562-
max_value: &mut Precision<ScalarValue>,
563-
) {
564-
match (&max_value, max_nominee) {
565-
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
566-
*max_value = max_nominee.clone();
567-
}
568-
(Precision::Exact(val1), Precision::Inexact(val2))
569-
| (Precision::Inexact(val1), Precision::Inexact(val2))
570-
| (Precision::Inexact(val1), Precision::Exact(val2))
571-
if val1 < val2 =>
572-
{
573-
*max_value = max_nominee.clone().to_inexact();
574-
}
575-
(Precision::Exact(_), Precision::Absent) => {
576-
let exact_max = mem::take(max_value);
577-
*max_value = exact_max.to_inexact();
578-
}
579-
(Precision::Absent, Precision::Exact(_)) => {
580-
*max_value = max_nominee.clone().to_inexact();
581-
}
582-
(Precision::Absent, Precision::Inexact(_)) => {
583-
*max_value = max_nominee.clone();
584-
}
585-
_ => {}
586-
}
587-
}
588-
589-
/// If the given value is numerically lesser than the original minimum value,
590-
/// return the new minimum value with appropriate exactness information.
591-
fn set_min_if_lesser(
592-
min_nominee: &Precision<ScalarValue>,
593-
min_value: &mut Precision<ScalarValue>,
594-
) {
595-
match (&min_value, min_nominee) {
596-
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
597-
*min_value = min_nominee.clone();
598-
}
599-
(Precision::Exact(val1), Precision::Inexact(val2))
600-
| (Precision::Inexact(val1), Precision::Inexact(val2))
601-
| (Precision::Inexact(val1), Precision::Exact(val2))
602-
if val1 > val2 =>
603-
{
604-
*min_value = min_nominee.clone().to_inexact();
605-
}
606-
(Precision::Exact(_), Precision::Absent) => {
607-
let exact_min = mem::take(min_value);
608-
*min_value = exact_min.to_inexact();
609-
}
610-
(Precision::Absent, Precision::Exact(_)) => {
611-
*min_value = min_nominee.clone().to_inexact();
612-
}
613-
(Precision::Absent, Precision::Inexact(_)) => {
614-
*min_value = min_nominee.clone();
615-
}
616-
_ => {}
617-
}
618-
}
619-
620494
#[cfg(test)]
621495
mod tests {
622496
use super::*;
@@ -679,7 +553,9 @@ mod tests {
679553

680554
// Call compute_summary_statistics
681555
let summary_stats =
682-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
556+
compute_summary_statistics(items, schema.fields().len(), |item| {
557+
Some(item.as_ref())
558+
});
683559

684560
// Verify the results
685561
assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15
@@ -754,7 +630,9 @@ mod tests {
754630
let items = vec![Arc::new(stats1), Arc::new(stats2)];
755631

756632
let summary_stats =
757-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
633+
compute_summary_statistics(items, schema.fields().len(), |item| {
634+
Some(item.as_ref())
635+
});
758636

759637
assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
760638
assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
@@ -784,7 +662,9 @@ mod tests {
784662
let items: Vec<Arc<Statistics>> = vec![];
785663

786664
let summary_stats =
787-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
665+
compute_summary_statistics(items, schema.fields().len(), |item| {
666+
Some(item.as_ref())
667+
});
788668

789669
// Verify default values for empty collection
790670
assert_eq!(summary_stats.num_rows, Precision::Absent);

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
4747
use datafusion_execution::TaskContext;
4848
use datafusion_physical_expr::equivalence::join_equivalence_properties;
4949

50+
use crate::statistics::compute_summary_statistics;
5051
use async_trait::async_trait;
5152
use futures::{ready, Stream, StreamExt, TryStreamExt};
5253

@@ -345,28 +346,25 @@ impl ExecutionPlan for CrossJoinExec {
345346
}
346347

347348
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
348-
todo!()
349-
/*
350349
let left_stats = self.left.statistics_by_partition()?;
351-
// Summarize the `left_stats`
352-
let statistics =
353-
compute_summary_statistics(file_group.iter(), &file_schema, |file| {
354-
file.statistics.as_ref().map(|stats| stats.as_ref())
355-
});
356350
let right_stats = self.right.statistics_by_partition()?;
357351

358352
if left_stats.is_empty() || right_stats.is_empty() {
359353
return Ok(vec![]);
360354
}
361355

356+
// Summarize the `left_stats`
357+
let statistics = compute_summary_statistics(
358+
left_stats.iter(),
359+
self.schema.fields().len(),
360+
|stats| Some(stats),
361+
);
362+
362363
let mut stats = Vec::new();
363-
for left in left_stats.iter() {
364-
for right in right_stats.iter() {
365-
stats.push(stats_cartesian_product(left.clone(), right.clone()));
366-
}
364+
for right in right_stats.iter() {
365+
stats.push(stats_cartesian_product(statistics.clone(), right.clone()));
367366
}
368367
Ok(stats)
369-
*/
370368
}
371369

372370
/// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done,

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,5 +90,6 @@ pub mod udaf {
9090
}
9191

9292
pub mod coalesce;
93+
pub mod statistics;
9394
#[cfg(test)]
9495
pub mod test;

datafusion/physical-plan/src/limit.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,22 @@ impl ExecutionPlan for LocalLimitExec {
347347
)
348348
}
349349

350+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
351+
let input_stats = self.input.statistics_by_partition()?;
352+
let mut stats = Vec::with_capacity(input_stats.len());
353+
for input_stat in input_stats {
354+
let stat = Statistics::with_fetch(
355+
input_stat,
356+
self.schema(),
357+
Some(self.fetch),
358+
0,
359+
1,
360+
)?;
361+
stats.push(stat);
362+
}
363+
Ok(stats)
364+
}
365+
350366
fn fetch(&self) -> Option<usize> {
351367
Some(self.fetch)
352368
}

datafusion/physical-plan/src/projection.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,20 @@ impl ExecutionPlan for ProjectionExec {
251251
))
252252
}
253253

254+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
255+
let input_stats = self.input.statistics_by_partition()?;
256+
let mut stats = Vec::with_capacity(input_stats.len());
257+
for input_stat in input_stats {
258+
let stat = stats_projection(
259+
input_stat.clone(),
260+
self.expr.iter().map(|(e, _)| Arc::clone(e)),
261+
Arc::clone(&self.schema),
262+
);
263+
stats.push(stat);
264+
}
265+
Ok(stats)
266+
}
267+
254268
fn supports_limit_pushdown(&self) -> bool {
255269
true
256270
}

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
346346
self.input.statistics()
347347
}
348348

349+
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
350+
Ok(vec![self.statistics()?])
351+
}
352+
349353
fn supports_limit_pushdown(&self) -> bool {
350354
true
351355
}

0 commit comments

Comments
 (0)