Skip to content

Per file filter evaluation #15057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Mar 7, 2025

A step towards #14993.

I decided to tackle filter pushdown first because:

  • A lot of the benefit of expression pushdown comes from filter pushdown since filters often remove large chunks of work from a query.
  • Filter pushdown already has a lot of the machinery in place, projection pushdown requires a lot more API design.

The idea is that we can experiment with filters because it's less work and later re-use FileExpressionRewriter to do projection pushdown once we flesh out the details and apply learnings from this piece of work.

@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Mar 7, 2025
@adriangb adriangb marked this pull request as draft March 7, 2025 06:52
@adriangb
Copy link
Contributor Author

adriangb commented Mar 7, 2025

The example is not working yet. It gets Error: Shared(ArrowError(ExternalError(External(ComputeError("Error evaluating filter predicate: Internal(\"PhysicalExpr Column references column '_user_info.age' at index 0 (zero-based) but input schema only has 0 columns: []\")"))), None)). I think some work will be needed with how the rewrites interact with projections for the filters.

@adriangb
Copy link
Contributor Author

adriangb commented Mar 7, 2025

Ok the example is now working and I think the overall approach is interesting but I don't think it's quite close to a workable solution.

@adriangb adriangb marked this pull request as ready for review March 10, 2025 23:01
@adriangb
Copy link
Contributor Author

The example is now working and even does stats pruning of shredded columns 🚀

let parquet_source = ParquetSource::default()
.with_predicate(self.schema.clone(), filter)
.with_pushdown_filters(true)
.with_filter_expression_rewriter(Arc::new(StructFieldRewriter) as _);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the API for users to attach this rewriter to their plan

Comment on lines 298 to 357
struct StructFieldRewriterImpl {
file_schema: SchemaRef,
}

impl TreeNodeRewriter for StructFieldRewriterImpl {
type Node = Arc<dyn PhysicalExpr>;

fn f_down(
&mut self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(scalar_function) = expr.as_any().downcast_ref::<ScalarFunctionExpr>()
{
if scalar_function.name() == "get_field" {
if scalar_function.args().len() == 2 {
// First argument is the column, second argument is the field name
let column = scalar_function.args()[0].clone();
let field_name = scalar_function.args()[1].clone();
if let Some(literal) =
field_name.as_any().downcast_ref::<expressions::Literal>()
{
if let Some(field_name) = literal.value().try_as_str().flatten() {
if let Some(column) =
column.as_any().downcast_ref::<expressions::Column>()
{
let column_name = column.name();
let source_field =
self.file_schema.field_with_name(column_name)?;
let expected_flattened_column_name =
format!("_{}.{}", column_name, field_name);
// Check if the flattened column exists in the file schema and has the same type
if let Ok(shredded_field) = self
.file_schema
.field_with_name(&expected_flattened_column_name)
{
if source_field.data_type()
== shredded_field.data_type()
{
// Rewrite the expression to use the flattened column
let rewritten_expr = expressions::col(
&expected_flattened_column_name,
&self.file_schema,
)?;
return Ok(Transformed::yes(rewritten_expr));
}
}
}
}
}
}
}
}

Ok(Transformed::no(expr))
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example implementation of a rewriter

@adriangb
Copy link
Contributor Author

@alamb I think this is ready for a first round of review when you have a chance!

@xudong963 xudong963 self-requested a review March 11, 2025 10:04
@adriangb
Copy link
Contributor Author

The main issue I've found with this approach is marking filters as Exact or Inexact.
In particular unless you mark them as Exact DataFusion will still need to pull the possibly large unshredded data to re-apply filters in a FilterExec. This doesn't completely kill performance because if the filter is selective there is less data to re-filter, but the worst case scenario is possibly worse than not having this feature at all. But I feel like this is a consequence of filter pushdown in general? Ignoring this change, if I have a TableProvider that returns a DataSourceExec and I have filter pushdown enabled, should I be marking all of my filters as Exact? That seems dangerous given that it's not documented anywhere that filter pushdown supports all filters that FilterExec does and things like

#[inline]
fn prevents_pushdown(&self) -> bool {
self.non_primitive_columns || self.projected_columns
}
.

@adriangb
Copy link
Contributor Author

Okay I think I can answer my own question: https://github.com/pydantic/datafusion/blob/38356998059a2d08113401ea8111f238899ab0b8/datafusion/core/src/datasource/listing/table.rs#L961-L995

Based on this it seems like it's safe to mark filters as exact if they are getting pushed down 😄

@adriangb
Copy link
Contributor Author

Okay folks sorry for the churn, I thought this was in a better state than it ended up being.

I've now reworked it to minimize the diff and make sure all existing tests pass. I'm going to add tests for the new functionality now to compliment the example.

Comment on lines 96 to 103
// Note about schemas: we are actually dealing with _4_ different schemas here:
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The "virtual" file schema: this is the table schema minus any hive partition columns. This is what the file schema is coerced to.
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
// - The filter schema: a hybrid of the virtual file schema and the physical file schema.
// If a filter is rewritten to reference columns that are in the physical file schema but not the virtual file schema, we need to add those columns to the filter schema so that the filter can be evaluated.
// This schema is generated by taking any columns from the virtual file schema that are referenced by the filter and adding any columns from the physical file schema that are referenced by the filter but not in the virtual file schema.
// Columns from the virtual file schema are added in the order they appear in the virtual file schema.
// The columns from the physical file schema are always added to the end of the schema, in the order they appear in the physical file schema.
//
// I think it might be wise to do some renaming of parameters where possible, e.g. rename `file_schema` to `table_schema_without_partition_columns` and `physical_file_schema` or something like that.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting bit to ponder upon

Comment on lines 25 to 34
/// Rewrite an expressions to take into account this file's particular schema.
/// This can be used to evaluate expressions against shredded variant columns or columns that pre-compute expressions (e.g. `day(timestamp)`).
pub trait FileExpressionRewriter: Debug + Send + Sync {
/// Rewrite an an expression in the context of a file schema.
fn rewrite(
&self,
file_schema: SchemaRef,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Arc<dyn PhysicalExpr>>;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: if users need the table_schema they can bind that inside of TableProvider::scan

@alamb
Copy link
Contributor

alamb commented Mar 15, 2025

I will try and give this a look over the next few days

@adriangb
Copy link
Contributor Author

adriangb commented Apr 13, 2025

I would like to resume this work.

Some thoughts should the rewrite happen via a new trait as I'm currently doing, or should we add a method PhysicalExpr::with_schema?
If we add with_schema what schema do we pass it? The actual file schema? There's something to be said for that: it could rewrite filters to case the literals / filters instead of casting the columns/arrays as is currently done, which should be cheaper. I expect that any time it was okay to cast the data it was also okay to cast the predicate itself. It could also absorb the work of reassign_predicate_columns (we implement it for Column such that if it's index doesn't match but another one does it swaps).

I suspect the hard bit with this approach will be edge cases: what if a filter cannot adapt itself to the file schema, but we could cast the column to make it work? I'm thinking something like a UDF that only accepts Utf8 but the the file produces Utf8View 🤔

I think @jayzhan211 proposed something similar in https://github.com/apache/datafusion/pull/15685/files#diff-2b3f5563d9441d3303b57e58e804ab07a10d198973eed20e7751b5a20b955e42.

@alamb any thoughts?

@jayzhan211
Copy link
Contributor

PhysicalExpr::with_schema

This method is too general and it is unclear what we need to do with the provided schema for each PhysicalExpr, it is not a good idea.

I suspect the hard bit with this approach will be edge cases: what if a filter cannot adapt itself to the file schema, but we could cast the column to make it work? I'm thinking something like a UDF that only accepts Utf8 but the the file produces Utf8View

I think it is unavoidable we need to cast the columns to be able to evaluate the filter.

Another question is, isn't the filter created based on table schema? And then the batch is read as file schema and casted to table schema and is evaluated by filter. What we could do is rewrite the filter based on file schema. Assume we have cast(a, i64) = 100, a is i32 in table schema and i64 in file schema. We rewrite it to cast(cast(a,i32),i64) = 100 and then optimize it with a = 100. In your example where udf only accepts utf8, we know that no optimization we could do so we just end up additional casting from file schema to table schema.

@adriangb
Copy link
Contributor Author

adriangb commented Apr 14, 2025

Another question is, isn't the filter created based on table schema? And then the batch is read as file schema and casted to table schema and is evaluated by filter.

Yes this is exactly the case.

What we could do is rewrite the filter based on file schema. Assume we have cast(a, i64) = 100, a is i32 in table schema and i64 in file schema. We rewrite it to cast(cast(a,i32),i64) = 100 and then optimize it with a = 100.

Yes that is exactly what I am proposing above, thank you for giving a more concrete example.

The other point is if we can use this same mechanism to handle shredding for the variant type. In other words, can we "optimize" variant_get(col, 'key') to col.typed_value.key if we know from the file schema that key is shredded for this specific file.

And if that all makes sense... how do we do those optimizations? Is it something like an optimizer that has to downcast match on the expressions, or do we add methods to PhysicalExpr for each expression to describe how it handles this behavior?

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 14, 2025

The other point is if we can use this same mechanism to handle shredding for the variant type. In other words, can we "optimize" variant_get(col, 'key') to col.typed_value.key if we know from the file schema that key is shredded for this specific file.

Probably

And if that all makes sense... how do we do those optimizations? Is it something like an optimizer that has to downcast match on the expressions, or do we add methods to PhysicalExpr for each expression to describe how it handles this behavior?

This is likely only applied to parquet filter so we can rewrite the filter when we know the filter + file_schema + table_schema (probably build_row_filter). We don't need optimize rule or trait method unless this rule could be applied more generally.

@adriangb
Copy link
Contributor Author

This is likely only applied to parquet filter so we can rewrite the filter when we know the filter + file_schema + table_schema (probably build_row_filter). We don't need optimize rule or trait method unless this rule could be applied more generally.

Yes agreed, that's basically what's in this PR currently: a custom trait to implement an optimizer pass with all of that information available.

My questions are:

  1. Is this the right API, or should some of the information on how to optimize / adapt to a specific file schema live in PhysicalExpr::with_file_schema or something like that?
  2. Can we use this mechanism both for variant_get(col, 'key') -> col.typed_value.key as well as cast(a, i64) = 100 -> cast(cast(a,i32),i64) = 100 -> a = 100 or is that mixing too many things into the same API?

@jayzhan211
Copy link
Contributor

Is this the right API, or should some of the information on how to optimize / adapt to a specific file schema live in PhysicalExpr::with_file_schema or something like that?

rewrite with file schema is specialized to filter, if you add PhysicalExpr::with_file_schema that means you may expect other kind of rewrite for other PhysicalExpr, I don't think we need this so far.

Can we use this mechanism both for variant_get(col, 'key') -> col.typed_value.key as well as cast(a, i64) = 100 -> cast(cast(a,i32),i64) = 100 -> a = 100 or is that mixing too many things into the same API?

Makes sense to me if we have many rules inside the filter_rewrite_with_file_schema logic as long as the rewrite is leveraged on the provided file schema.

Comment on lines +60 to +61
/// Schema of the file as far as the rest of the system is concerned.
pub logical_file_schema: SchemaRef,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this rename is worth it - there's been constant confusion even amongst maintainers about this. And this is only public for internal use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants