Skip to content

Evaluate filter pushdown against the physical schema for performance and correctness #15780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
adriangb opened this issue Apr 20, 2025 · 11 comments
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@adriangb
Copy link
Contributor

adriangb commented Apr 20, 2025

Describe the bug

Consider the following test:

COPY  (
  SELECT arrow_cast(a, 'Int16') AS a
  FROM ( VALUES (1), (2), (3) ) AS t(a)
)  TO 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet'
STORED AS PARQUET;

set datafusion.execution.parquet.pushdown_filters = true;

CREATE EXTERNAL TABLE t_pushdown(a int) STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/';

select * from t_pushdown where a = arrow_cast(2, 'Int8');

At some point DataFusion optimizes the Int8 filter by casting the filter to Int32 (matching the table schema, thus avoiding having to cast the column).

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8! Since we now build pruning predicates, etc. on a per-file basis using the physical file schema this can introduce casting of the data from Int8 to Int32 which is unnecessary because (1) we could cast the filter instead which would be much cheaper and (2) if the file type and filter type were both Int8 or Int16 in this example (as might happen if one changes the table schema but not old data or old queries) we would actually be closer to the original intent of the query.

To be clear, I do not mean that this is a new regression. I believe this has always been the case but now we can actually fix it and before we could not.

This applies not only to stats filtering (where the impact is likely negligible) but also to predicate pushdown where I expect the impact may be much larger especially for cases where we never end up materializing the columns (and thus don't have to cast them to the table's data type at all). I don't know that any benchmark measures this case at the moment though.

To resolve this I think we just need to call optimize_casts(physical_expr, physical_file_schema) (a made up function) but I don't know where or howoptimize_casts exists (I feel like it must already exist, maybe it's at the logical expr level?). Does anyone know where this exists?

@adriangb adriangb added the bug Something isn't working label Apr 20, 2025
@james-ryans
Copy link

I would love to work on this task

@adriangb
Copy link
Contributor Author

I can confirm this is currently being done at the LogicalPlan level. I'd say the first step is to understand how it happens there and then if something similar exists for PhysicalExpr and if it doesn't create it.

@leoyvens
Copy link
Contributor

To understand how this happens in the logical optimizer, as part of the SimplifyExpressions pass, you can look at unwrap_cast.rs.

@alamb
Copy link
Contributor

alamb commented May 5, 2025

I think the first thing to do would be to try and write some tests that show the error happening

Perhaps we could use the existing statistics: predicate_evaluation_errors
https://github.com/apache/datafusion/blob/6d5e00ad3f8e53f7252cb1d3c72a6c7f28c1aed6/datafusion/datasource-parquet/src/metrics.rs#L31-L30

Here is an example test that shows how to use those statistics: https://github.com/search?q=repo%3Aapache%2Fdatafusion%20predicate_evaluation_errors&type=code

assert_eq!(output.predicate_evaluation_errors(), Some(0));

assert_eq!(output.predicate_evaluation_errors(), expected_errors);

@adriangb
Copy link
Contributor Author

@alamb there is no error AFAIK. It currently works, but it works by casting the data to match the types of the table. The point I’m making is that we could instead cast the expression to match the type of the file, possibly saving a lot of copying / blowing up dictionaries.

@adriangb
Copy link
Contributor Author

adriangb commented May 21, 2025

As discussed a bit in #16086 (comment) there is a fundamental problem that all of the predicates are planned at the table level.

So for example the predicate func(col1) where at the table / logical level the types are col1: Utf8View.
If the file types are col1: Uf8View then how do we know that func(Utf8View) won't error?
There's other cases... basically it's not clear to me that it's trivial to get there / I worry we have to re-invent casting rules at the physical layer.
I remember @findepi said somewhere something along the lines of "casting should not be changed after planning" (please correct me or feel free to chime in)

My feeling is that we eventually have to do this for performance / reducing extra work, to add important new features and for correctness reasons, but evidently my initial attempt was too naive so we had to revert it.
I am willing to try again later, especially that we now have some tests in place (we need more, e.g. for the example with a function above).

@adriangb adriangb changed the title Unnecessary casting in stats & filter evaluation Evaluate filter pushdown against the physical schema for performance and correctness May 21, 2025
@findepi
Copy link
Member

findepi commented May 22, 2025

"casting should not be changed after planning"

if i said exactly that I should stand corrected.
coercions is somewhat that should be applied during analysis/initial planning phase. Coercion rules result in casts being inserted into the plan. After the initial plan is fully formed, the word "coercion" does not exist anymore.

The casts are same category as function calls -- the optimizer may reorganize or replace function calls with other expressions as long as they are equivalent (and are believed to "be better"). Casts can be removed or replaced the same way (again: as long as the resulting expression is well formed and equivalent).

from the issue description:

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8!

Where does Int8 come back?

Anyway, as the example shows, two different files may have two different internal representation for the same SQL-level column. I.e. the table may declare Int64, but the file may contain Int32 or Int16. (This is not limited to various Int bitnesses).
The Parquet source which deals with individual files may perform similar logic to unwrap_cast optimizer.
Does it matter though?

@adriangb
Copy link
Contributor Author

"casting should not be changed after planning"

if i said exactly that I should stand corrected. coercions is somewhat that should be applied during analysis/initial planning phase. Coercion rules result in casts being inserted into the plan. After the initial plan is fully formed, the word "coercion" does not exist anymore.

The casts are same category as function calls -- the optimizer may reorganize or replace function calls with other expressions as long as they are equivalent (and are believed to "be better"). Casts can be removed or replaced the same way
(again: as long as the resulting expression is well formed and equivalent).

Thanks for correcting me! That's the sort of distinction I knew you'd be able to make that I was lacking. It's a helpful way to think about it

from the issue description:

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8!

Where does Int8 come back?
Anyway, as the example shows, two different files may have two different internal representation for the same SQL-level column. I.e. the table may declare Int64, but the file may contain Int32 or Int16. (This is not limited to various Int bitnesses). The Parquet source which deals with individual files may perform similar logic to unwrap_cast optimizer. Does it matter though?

That's the point: we need to do similar logic to unwrap_cast, which is non trivial I think.
I started down that road and got confused about some cases so I backed out.
For example, if you have a table schema of col1: Int64, col2: Int64 and a predicate col1 = col2 there will be no casts at the logical level. But when you get to the file level you have the schema col1: Int8, col2: UInt32, now you have to do something more similar to coercion I think (i.e. introduce some casts)? What would you suggest we do in this case?

Basically I think we need to all agree that this complexity is the right way to go and then agree on what to do in the different scenarios.

@findepi
Copy link
Member

findepi commented May 22, 2025

It's very natural to think about file-level vs table level as same thing as SQL coercions, but there is an important distinction. SQL has its own semantics and table provider has its own semantics.
Making this distinction is easier to understand in systems where it's not Arrow everywhere and SQL side and table provider side are cleanly delimitated by their different type systems.

From Parquet to table level -- the semantic of this operation is defined by a read. What happens if file has col1: Int8 but the table defines it as Int32? Well, nothing unusual, Int8 is extended to Int32 (infallibly). There is "a cast" (an equivalent of SQL cast) happening inside the table provider. If a query comes with a filter (in Int32 terms), the filter may be translated to col1 by equivalent of the unwrap cast optimization (yes, separate code).

@adriangb
Copy link
Contributor Author

If a query comes with a filter (in Int32 terms), the filter may be translated to col1 by equivalent of the unwrap cast optimization (yes, separate code).

What about in the situation described above? What happens now is basically that both columns get cast to Int32 to match their table level types. But that's at least one extra cast. And there's cases like structs with extra fields where I don't think it makes sense to do any cast: if the file type has extra fields it's a compatible with any function that operates on the table type that has a subset of the fields.

@alamb
Copy link
Contributor

alamb commented May 23, 2025

What about in the situation described above? What happens now is basically that both columns get cast to Int32 to match their table level types. But that's at least one extra cast. And there's cases like structs with extra fields where I don't think it makes sense to do any cast: if the file type has extra fields it's a compatible with any function that operates on the table type that has a subset of the fields.

I predict that the 'extra fields' usecase is going to be the one that will be the most important and will drive this feature (as perfomance will be disasterous without it)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

5 participants