Skip to content

Commit 85f92ef

Browse files
authored
Apply projection to Statistics in FilterExec (#13187)
* Apply projection to `Statistics` in `FilterExec` * Use Statistics::project in HashJoin
1 parent a9d4d52 commit 85f92ef

File tree

4 files changed

+77
-12
lines changed

4 files changed

+77
-12
lines changed

datafusion/common/src/stats.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,26 @@ impl Statistics {
258258
self
259259
}
260260

261+
/// Project the statistics to the given column indices.
262+
///
263+
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
264+
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
265+
/// "b"}`.
266+
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
267+
let Some(projection) = projection else {
268+
return self;
269+
};
270+
271+
// todo: it would be nice to avoid cloning column statistics if
272+
// possible (e.g. if the projection did not contain duplicates)
273+
self.column_statistics = projection
274+
.iter()
275+
.map(|&i| self.column_statistics[i].clone())
276+
.collect();
277+
278+
self
279+
}
280+
261281
/// Calculates the statistics after `fetch` and `skip` operations apply.
262282
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
263283
/// parameter to compute global statistics in a multi-partition setting.

datafusion/physical-plan/src/filter.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec {
371371
/// The output statistics of a filtering operation can be estimated if the
372372
/// predicate's selectivity value can be determined for the incoming data.
373373
fn statistics(&self) -> Result<Statistics> {
374-
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
374+
let stats = Self::statistics_helper(
375+
&self.input,
376+
self.predicate(),
377+
self.default_selectivity,
378+
)?;
379+
Ok(stats.project(self.projection.as_ref()))
375380
}
376381

377382
fn cardinality_effect(&self) -> CardinalityEffect {

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -785,24 +785,15 @@ impl ExecutionPlan for HashJoinExec {
785785
// TODO stats: it is not possible in general to know the output size of joins
786786
// There are some special cases though, for example:
787787
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
788-
let mut stats = estimate_join_statistics(
788+
let stats = estimate_join_statistics(
789789
Arc::clone(&self.left),
790790
Arc::clone(&self.right),
791791
self.on.clone(),
792792
&self.join_type,
793793
&self.join_schema,
794794
)?;
795795
// Project statistics if there is a projection
796-
if let Some(projection) = &self.projection {
797-
stats.column_statistics = stats
798-
.column_statistics
799-
.into_iter()
800-
.enumerate()
801-
.filter(|(i, _)| projection.contains(i))
802-
.map(|(_, s)| s)
803-
.collect();
804-
}
805-
Ok(stats)
796+
Ok(stats.project(self.projection.as_ref()))
806797
}
807798
}
808799

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91
549549

550550
statement ok
551551
DROP TABLE test_non_utf8_binary;
552+
553+
554+
## Tests for https://github.com/apache/datafusion/issues/13186
555+
statement ok
556+
create table cpu (time timestamp, usage_idle float, usage_user float, cpu int);
557+
558+
statement ok
559+
insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3);
560+
561+
# must put it into a parquet file to get statistics
562+
statement ok
563+
copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet';
564+
565+
# Run queries against parquet files
566+
statement ok
567+
create external table cpu_parquet
568+
stored as parquet
569+
location 'test_files/scratch/parquet/cpu.parquet';
570+
571+
# Double filtering
572+
#
573+
# Expect 1 row for both queries
574+
query PI
575+
select time, rn
576+
from (
577+
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
578+
from cpu
579+
where cpu = 3
580+
) where rn > 0;
581+
----
582+
1970-01-01T00:00:00 1
583+
584+
query PI
585+
select time, rn
586+
from (
587+
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
588+
from cpu_parquet
589+
where cpu = 3
590+
) where rn > 0;
591+
----
592+
1970-01-01T00:00:00 1
593+
594+
595+
# Clean up
596+
statement ok
597+
drop table cpu;
598+
599+
statement ok
600+
drop table cpu_parquet;

0 commit comments

Comments
 (0)