Skip to content

Implement Parquet filter pushdown via new filter pushdown APIs #15769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider {
ParquetSource::default()
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(Arc::clone(&schema), predicate)
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let source =
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
.with_projection(projection.cloned())
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::ObjectWriterBuilder;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use async_trait::async_trait;
Expand Down Expand Up @@ -174,7 +173,6 @@ impl FileFormat for ArrowFormat {
&self,
_state: &dyn Session,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ pub(crate) mod test_util {
.with_projection(projection)
.with_limit(limit)
.build(),
None,
)
.await?;
Ok(exec)
Expand Down
39 changes: 4 additions & 35 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc};

use crate::datasource::{
create_ordering,
file_format::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
file_format::{file_compression_type::FileCompressionType, FileFormat},
physical_plan::FileSinkConfig,
};
use crate::execution::context::SessionState;
Expand All @@ -35,22 +33,19 @@ use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use datafusion_common::{
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
SchemaExt, ToDFSchema,
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt,
};
use datafusion_execution::cache::{
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement};

use async_trait::async_trait;
use datafusion_catalog::Session;
Expand Down Expand Up @@ -941,19 +936,6 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

let filters = match conjunction(filters.to_vec()) {
Some(expr) => {
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Some(filters)
}
None => None,
};

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
Expand All @@ -978,7 +960,6 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.build(),
filters.as_ref(),
)
.await
}
Expand All @@ -1002,18 +983,6 @@ impl TableProvider for ListingTable {
return Ok(TableProviderFilterPushDown::Exact);
}

// if we can't push it down completely with only the filename-based/path-based
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense to me -- when @itsjunetime originally implemented this code, there was some complexity because there was no way to do filter pushdown in ExecutionPlans so in my mind this approach was a (clever) workaround

The comments even hint that this is a parquet specific special case

I think the new pattern of handling predicates more generally in this PR is cleaner and will support more cases. Since this code is only currently executed

Perhaps @cisaacson has some other thoughts

// column names, then we should check if we can do parquet predicate pushdown
let supports_pushdown = self.options.format.supports_filters_pushdown(
&self.file_schema,
&self.table_schema,
&[filter],
)?;

if supports_pushdown == FilePushdownSupport::Supported {
return Ok(TableProviderFilterPushDown::Exact);
}
Comment on lines -1005 to -1015
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this PR is that this moves from being something specialized that ListingTable does to anything that works for any TableProvider / they don't need to do anything special! The checks for compatibility also happen all within the parquet data source machinery, instead of leaking implementations via supports_filters_pushdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question: aren't we expecting/preparing for, people to use ListingTable if they read Parquet files? Are we eventually planning to remove all format-specific handlings? Or this is a case only for filter pushdown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, why don't we fully remove supports_filters_pushdown() API at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think many users of DataFusion (based on our usage, talks I've seen and examples we have) use custom TableProvider implementations.

I would keep supports_filters_pushdown so that TableProviders can do Exact pruning of filters, e.g. using partition columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can justify implementing other TableProviders for Parquet, but still I cannot understand why we need to degrade the capabilities of our ListingTable. Is't it always better pruning/simplifying things at the higher levels as possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question: aren't we expecting/preparing for, people to use ListingTable if they read Parquet files? Are we eventually planning to remove all format-specific handlings? Or this is a case only for filter pushdown?

For what it is worth, we (InfluxData) doesn't use ListingTable to read parquet files, instead we provide our own equivalent and create the DataSourceExec's directly

I would keep supports_filters_pushdown so that TableProviders can do Exact pruning of filters, e.g. using partition columns.

Yes I think that is important too -- I don't think we should be removing any APIs from ListingTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can justify implementing other TableProviders for Parquet, but still I cannot understand why we need to degrade the capabilities of our ListingTable. Is't it always better pruning/simplifying things at the higher levels as possible?

I don't think this degrades the capabilities of the current listing table. I think the only implications are for anyone who used a custom FileFormat and impleented supports_filters_pushdown -- I suspect this is not very common and we can likely avoid consternation by mentioning it in the upgrade guide (see comment below)


Ok(TableProviderFilterPushDown::Inexact)
})
.collect()
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;

use datafusion_datasource::file::FileSource;
use datafusion_datasource::{FileRange, PartitionedFile};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_datasource_parquet::{
Expand Down Expand Up @@ -139,7 +140,7 @@ mod tests {
self.round_trip(batches).await.batches
}

fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
// set up predicate (this is normally done by a layer higher up)
let predicate = self
.predicate
Expand All @@ -148,7 +149,7 @@ mod tests {

let mut source = ParquetSource::default();
if let Some(predicate) = predicate {
source = source.with_predicate(Arc::clone(&file_schema), predicate);
source = source.with_predicate(predicate);
Comment on lines -151 to +152
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed like an easy win since I was able to just change this so that the schema is always passed in by the FileSourceConfigBuilder instead of only when with_predicate is called.
This was necessary becasue with_predicate is no longer called to attach a predicate, instaed it happens during an optimization pass so ParquetSource neesd to have it available at that point.
I left with_predicate in there to avoid churn and in case there is a use case for attaching a predicate directly through the scan instad of a as a FilterExec that later gets pushed into the scan.

}

if self.pushdown_predicate {
Expand All @@ -161,14 +162,14 @@ mod tests {
source = source.with_enable_page_index(true);
}

Arc::new(source)
source.with_schema(Arc::clone(&file_schema))
}

fn build_parquet_exec(
&self,
file_schema: SchemaRef,
file_group: FileGroup,
source: Arc<ParquetSource>,
source: Arc<dyn FileSource>,
) -> Arc<DataSourceExec> {
let base_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};

use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use object_store::path::Path;
Expand Down Expand Up @@ -182,10 +183,11 @@ impl TestParquetFile {
let physical_filter_expr =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;

let source = Arc::new(ParquetSource::new(parquet_options).with_predicate(
Arc::clone(&self.schema),
Arc::clone(&physical_filter_expr),
));
let source = Arc::new(
ParquetSource::new(parquet_options)
.with_predicate(Arc::clone(&physical_filter_expr)),
)
.with_schema(Arc::clone(&self.schema));
let config = scan_config_builder.with_source(source).build();
let parquet_exec = DataSourceExec::from_data_source(config);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn execute_with_predicate(
ctx: &SessionContext,
) -> Vec<String> {
let parquet_source = if prune_stats {
ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone())
ParquetSource::default().with_predicate(predicate.clone())
} else {
ParquetSource::default()
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl TestFull {
let source = if let Some(predicate) = predicate {
let df_schema = DFSchema::try_from(schema.clone())?;
let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate))
Arc::new(ParquetSource::default().with_predicate(predicate))
} else {
Arc::new(ParquetSource::default())
};
Expand Down
24 changes: 22 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_common::stats::Precision;
use datafusion_common::DFSchema;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache,
Expand All @@ -37,6 +38,10 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit, Expr};

use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::ExecutionPlan;
use tempfile::tempdir;

#[tokio::test]
Expand All @@ -48,6 +53,9 @@ async fn check_stats_precision_with_filter_pushdown() {
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table = get_listing_table(&table_path, None, &opt).await;
let (_, _, state) = get_cache_runtime_state();
let mut options = state.config().options().clone();
options.execution.parquet.pushdown_filters = true;

// Scan without filter, stats are exact
let exec = table.scan(&state, None, &[], None).await.unwrap();
assert_eq!(
Expand All @@ -56,9 +64,21 @@ async fn check_stats_precision_with_filter_pushdown() {
);

// Scan with filter pushdown, stats are inexact
// This is a filter that cannot be evaluated by the table provider planning
// (it is not a partition filter) -> will be pushed down to the scan
// with the appropriate optimizer pass.
let filter = Expr::gt(col("id"), lit(1));

let exec = table.scan(&state, None, &[filter], None).await.unwrap();
let exec = table
.scan(&state, None, &[filter.clone()], None)
.await
.unwrap();
let ctx = SessionContext::new();
let df_schema = DFSchema::try_from(table.schema()).unwrap();
let filter = ctx.create_physical_expr(filter, &df_schema).unwrap();
let exec =
Arc::new(FilterExec::try_new(filter, exec).unwrap()) as Arc<dyn ExecutionPlan>;
let exec = FilterPushdown::new().optimize(exec, &options).unwrap();
assert!(exec.as_any().is::<DataSourceExec>()); // sanity check that the pushdown did what we expected
assert_eq!(
exec.partition_statistics(None).unwrap().num_rows,
Precision::Inexact(8)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec

let source = Arc::new(
ParquetSource::default()
.with_predicate(Arc::clone(&schema), predicate)
.with_predicate(predicate)
.with_enable_page_index(true),
);
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/physical_optimizer/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PredicateSupports,
ChildPushdownSupports, FilterPushdownPropagation,
};
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileSource for TestSource {
}

fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
todo!("should not be called")
Arc::new(self.clone()) as Arc<dyn FileSource>
}

fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
Expand Down Expand Up @@ -166,7 +166,7 @@ impl FileSource for TestSource {
statistics: self.statistics.clone(), // should be updated in reality
});
Ok(FilterPushdownPropagation {
filters: PredicateSupports::all_supported(filters),
filters: ChildPushdownSupports::all_supported(filters),
updated_node: Some(new_node),
})
} else {
Expand Down
53 changes: 0 additions & 53 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use std::sync::Arc;

use arrow::datatypes::DataType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion::{
datasource::{
file_format::{csv::CsvFormat, parquet::ParquetFormat},
Expand All @@ -42,8 +40,6 @@ use datafusion_common::stats::Precision;
use datafusion_common::test_util::batches_to_sort_string;
use datafusion_common::ScalarValue;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, lit, Expr, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -57,55 +53,6 @@ use object_store::{
use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload};
use url::Url;

#[tokio::test]
async fn parquet_partition_pruning_filter() -> Result<()> {
let ctx = SessionContext::new();

let table = create_partitioned_alltypes_parquet_table(
&ctx,
&[
"year=2021/month=09/day=09/file.parquet",
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
&[
("year", DataType::Int32),
("month", DataType::Int32),
("day", DataType::Int32),
],
"mirror:///",
"alltypes_plain.parquet",
)
.await;

// The first three filters can be resolved using only the partition columns.
let filters = [
Expr::eq(col("year"), lit(2021)),
Expr::eq(col("month"), lit(10)),
Expr::eq(col("day"), lit(28)),
Expr::gt(col("id"), lit(1)),
];
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
if let Some((_, parquet_config)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
let pred = parquet_config.predicate().unwrap();
// Only the last filter should be pushdown to TableScan
let expected = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
));

assert!(pred.as_any().is::<BinaryExpr>());
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();

assert_eq!(pred, expected.as_ref());
}
Ok(())
}

#[tokio::test]
async fn parquet_distinct_partition_col() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
2 changes: 0 additions & 2 deletions datafusion/datasource-avro/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::source::DataSourceExec;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_session::Session;

Expand Down Expand Up @@ -150,7 +149,6 @@ impl FileFormat for AvroFormat {
&self,
_state: &dyn Session,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let config = FileScanConfigBuilder::from(conf)
.with_source(self.file_source())
Expand Down
Loading
Loading