Skip to content

refactor filter pushdown apis #15801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Apr 22, 2025

This improves over the existing APIs by:

  1. Not requiring downcast matching of specific ExecutionPlans (as discussed previously this could be resolved by adding a new method, but in the end both approaches require two methods).
  2. Doing less invasive modification of the plans (e.g. not popping and then re-inserting FilterExecs).
  3. Reducing the algorithmic complexity of the optimizer rule (it's now a single pass over the tree for all cases, and avoids cloning unless strictly needed).
  4. Allowing plans to decide how any of the leftover filters from parents or that they generated should be handled (I think this will be useful for TopK).

This is in some ways simpler and in some ways more complex than what we have right now. The difference is not large in terms of LOC, I think the rest boils down to personal preference.

I didn't use the TreeNode APIs, maybe there's a way to do that using transform_down_up or something but I couldn't figure it out. In any case that seems like an internal refactor that can be done later, as long as the manual implementation passes tests and the ExecutionPlan APIs are the same.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate datasource Changes to the datasource crate labels Apr 22, 2025
@adriangb adriangb marked this pull request as ready for review April 22, 2025 17:59
@adriangb
Copy link
Contributor Author

cc @berkaysynnada

@berkaysynnada
Copy link
Contributor

  1. Not requiring downcast matching of specific ExecutionPlans (as discussed previously this could be resolved by adding a new method, but in the end both approaches require two methods).

We can actually do that with the current version in a very similar way, and I think the API should just look like:

/// Gets the fetch count for the operator. `None` means there is no fetch.
fn fetch(&self) -> Option<usize> {
    None
}

Similarly, we could just define fn filter(&self) -> Option<Arc<dyn PhysicalExpr>>

  1. Doing less invasive modification of the plans (e.g. not popping and then re-inserting FilterExecs).

That’s kind of the nature of transform APIs. I don’t think it’s a big deal. When the algorithm encounters a FilterExec, it either tries to push it down, or takes a conservative path and only removes it when it’s certain that it should be removed. Either way, we're biasing the rule, and if the outcome doesn’t go our way, the cost is just a small overhead.

  1. Reducing the algorithmic complexity of the optimizer rule (it's now a single pass over the tree for all cases, and avoids cloning unless strictly needed).

The filter() API also enables us to convert the current algorithm to a single-pass one.

  1. Allowing plans to decide how any of the leftover filters from parents or that they generated should be handled (I think this will be useful for TopK).

That’s still possible with the current approach.

This is in some ways simpler and in some ways more complex than what we have right now. The difference is not large in terms of LOC, I think the rest boils down to personal preference.

What you’ve done here works very well, but it seems a bit more complicated, and I think folks who aren’t deep in this context might find it harder to follow. As I mentioned, I prefer to make things more explicit and easier to understand in these early implementation stages. Once the requirements and implementations are stable, we can shift focus to optimizations.

I'm planning to open a PR which brings in the revisit recursion variant and introducing the filter() API. With those, I hope to address your concerns. If I don't get something wrong, these issues don't block any implementation, but you are only worried about some design nuances and performance?

@adriangb
Copy link
Contributor Author

adriangb commented Apr 24, 2025

As I mentioned, I prefer to make things more explicit and easier to understand in these early implementation stages. Once the requirements and implementations are stable, we can shift focus to optimizations.

I totally agree, we are 100% aligned on this. I think where we may disagree is what "simpler" means. It's obviously somewhat subjective but the objective measures I'm trying to look at are:

  • Lines of code: both implementations seem to be about the same. If anything this implementations is less: this PR is currently +160 LOC, but I think that can be brought down with some cleanup, and it is introducing 1 new method that the current implementation would also have to contend with.
  • Touch points: the current implementation requires knowledge of the helpers being used, e.g. to set the revisit parameter. Maybe some of that can be solved but I still feel like the public APIs are more inextricably linked to the implementation of the optimizer pass than the implementation in this PR is.
  • Functionality: this PR solves the problems stated in the description that I think we both agree are real. Now as you say there may be other ways to solve those with the current implementation but I'd fear that makes the points above worse (more lines of code, more methods in the public APIs, more linkage between them). Especially if it's introducing both fetch() and filter().

We can actually do that with the current version in a very similar way, and I think the API should just look like:
Similarly, we could just define fn filter(&self) -> Option<Arc>

I don't understand where these new methods would go or how they would help. It would be helpful to me if when methods were proposed the trait or struct they are intended to go on was included and maybe an example of them being called was as well.

Doing less invasive modification of the plans (e.g. not popping and then re-inserting FilterExecs).

That’s kind of the nature of transform APIs. I don’t think it’s a big deal. When the algorithm encounters a FilterExec, it either tries to push it down, or takes a conservative path and only removes it when it’s certain that it should be removed. Either way, we're biasing the rule, and if the outcome doesn’t go our way, the cost is just a small overhead.

There have had to be efforts in the past to minimize the amount of cloning / copying going on during optimizer passes, especially since DataFusion does not have prepared statements they can end up being a non trivial portion of query time for systems with optimized caching, etc. And I'm not sure that I would call removing a node optimistically then putting it back in place more conservative than only removing it if we know that will be the final decision.

Allowing plans to decide how any of the leftover filters from parents or that they generated should be handled (I think this will be useful for TopK).

That’s still possible with the current approach.

It's not clear to me how. It seems like the current approach hardcodes the behavior for FilterExec - we'd have to either hardcode it for SortExec (and later HashJoin, etc.) or add more methods to ExecutionPlan which will make the current implementation even more complex.

If I don't get something wrong, these issues don't block any implementation, but you are only worried about some design nuances and performance?

This is blocking work to some extent in the sense that I've had to work around the limitations of the current implementation in the TopK filter pushdown and other open PRs.

Ultimately this is a working implementation that covers all cases and solves the current limitations. I do not understand the need to delay progress so we can shoehorn the implementation into using specific helper APIs for recursion that put limitations on how we structure our user facing APIs.

@berkaysynnada
Copy link
Contributor

I understand your concerns and motivations. Since you're leading this work, I'm happy to follow your direction. I'm sure we both aim to do what's right, even if our priorities and perspectives naturally differ.

I'd like to take one more day to push a commit to this PR, mainly focusing on stylistic and idiomatic improvements as well as some comments (so we can be confident that at least two people stand behind it 😄). If you disagree with any of the changes, please feel free to iterate on those points. Thanks for the work

@adriangb
Copy link
Contributor Author

Sounds good happy to wait for your suggestions. And we can always iterate again!

@alamb alamb mentioned this pull request Apr 29, 2025
26 tasks
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb and @berkaysynnada

I am approving this PR because i think the plans that come out are better (filters stay in the same evaluation order), though I do think that could be done more simply.

I do think it would be really good to avoid splitting the logic into two in-sync methods on ExecutionPlan (see comments)

I am mostly +0 on the code reorganization (I don't think it is any better or worse than what is on main)

Thank you both for working on this feature and I apologize for the late review

@@ -154,29 +153,25 @@ impl FileSource for TestSource {

fn try_pushdown_filters(
&self,
mut fd: FilterDescription,
filters: &[Arc<dyn PhysicalExpr>],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally prefer keeping a struct as an argument as it

  1. Is easier to document / explain what the structures mean via comments
  2. it is clearer in the struct definition what types of operations (e.g. append only vs swap, etc) are done
  3. It is easier to add new fields if needed

Basically, something like

pub struct FilterDescription {
        filters: &[Arc<dyn PhysicalExpr>],
}

...

This is a preference thing and i can easily see alternative opinions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do that 👍🏻

config: &ConfigOptions,
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let mut all_filters = filters.iter().map(Arc::clone).collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if this was in a struct, we could implement clone() for the struct and hide this complexity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the same be true if it was just Vec<Arc<dyn PhysicalExpr>>?

I tried implementing a struct with some helper methods but the thing is that for ExecutionPlan we have a slightly different flow than DataSource and FileSource (which are their own traits and hence need their own method) so re-using the struct becomes a bit wonky.

@@ -278,7 +273,7 @@ fn test_filter_collapse() {
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually looks like an improvement to me as now a = foo will be evaluated before b=bar as was done in the input plan. This might be important for short circuiting, perhaps

The prior version of this optimization seems to have reordered them

impl Default for PushdownFilter {
fn default() -> Self {
Self::new()
}
}

pub type FilterDescriptionContext = PlanContext<FilterDescription>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterPushdownState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to add some comments here

.into_iter()
.map(Arc::clone)
.collect::<Vec<_>>();
let mut parent_pushdown_result =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea to make this code easier to follow might be to break it up into multiple functions. Maybe even a struct or something that held the key state

pub struct SingleNodePushdown {
  pushdown_plan: ....
  new_children: Vec<>
}
...

/// See [`PushdownFilter`] for more details.
///
/// [`PushdownFilter`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html
fn try_pushdown_filters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, I prefer the previous implementation with just a single method on ExectionPlan -- I think having two methods whose implementation must remain synchronized is more complicated to reason about and likely be harder to work with

Copy link
Contributor Author

@adriangb adriangb May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed I'd prefer a single method as well - but as per #15770 (comment) we probably would have had to add a new method to the existing implementation anyway. I don't see a way to have a single method without making it absurdly convoluted just to avoid 2 methods.

I'll also point out that there are multiple dimensions of filter pushdown for each node:

  • does the node allow parent filters to be pushed down to it's children?
  • does the node want to add any filters to be passed to it's children?
  • does the node need to handle the result of pushdown?

Having two methods makes it easy to have canned implementations for each one of these independently and combine/compose them.

E.g.:

  • TopK or HashJoin will override gather_filters_for_pushdown but not handle_child_pushdown_result
  • FilterExec will override both
  • DataSourceExec will override only handle_child_pushdown_result
  • ProjectionExec will override only gather_filters_for_pushdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am convinced

// NOTE that caller of try_pushdown_filters() should handle these remanining predicates,
// possibly introducing a FilterExec on top of this operator.
pub remaining_description: FilterDescription,
/// The result of pushing down filters into a node that it returns to its parent.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great comments,

@berkaysynnada
Copy link
Contributor

Hi @alamb. I've a WIP commit for this PR. I'm trying to both address @adriangb concerns and needs, and at the same time trying to keep complexity at minimum and trying to make things similar to other DF patterns. I think I can finish it at the end of today. Thank you for the patience

@alamb
Copy link
Contributor

alamb commented May 1, 2025

Hi @alamb. I've a WIP commit for this PR. I'm trying to both address @adriangb concerns and needs, and at the same time trying to keep complexity at minimum and trying to make things similar to other DF patterns. I think I can finish it at the end of today. Thank you for the patience

Thank you for your work and help. I will not merge this PR until I hear from you

@berkaysynnada berkaysynnada force-pushed the filter-pushdown-change branch from 5e59d74 to f732b1a Compare May 2, 2025 10:05
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label May 2, 2025
@berkaysynnada
Copy link
Contributor

Hi again @adriangb. I've sent the commit, but unfortunately I can't say I fully achieved what I had in mind. Most of the work are idiomatic changes, comments and naming -- I'd still love to hear your thoughts on them.

There are a few points I'm not completely comfortable with:

  1. New ExecutionPlan APIs: We ended up introducing two new APIs, and I know we can't merge them into one. However, the existing ExecutionPlan APIs are general-purpose, self-explanatory, and simple, whereas these new ones feel harder to understand and are quite specific to the current rule.
    I initially tried to add something like filter(&self) -> Arc<dyn PhysicalExpr> and try_filter_pushdown(&self, filter: Arc<dyn PhysicalExpr>) -> Self, but couldn't get it passing all the tests. It seems theoretically possible, but I prefer not spending more time on it at this stage.
  2. Related to the first item, I'm also a bit concerned that the two new APIs -- gather_filters_for_pushdown and handle_child_pushdown_result -- are both doing parts of the pushdown work. The first one analyzes, the second applies changes. This feels a bit confusing, as the responsibilities aren't clearly separated (because I was always thinking the API's like 1-> information of having filter 2-> how do you treat these filters). At least, we should write in detail what each method and parameter means (I couldn't put them into words :/ )
  3. Another thing I couldn't fully understand is why we need to separate the analysis & pushdown work of parent filters vs. self filters. Could you give an example where keeping them separate is necessary?
    If this distinction isn't critical, it could simplify handle_child_pushdown_result(), since we could avoid passing parent_filters: Vec<Arc<dyn PhysicalExpr>>, helping address item-1 as well.

I don't want to hold up this PR any longer and block your further work, but I wanted to write down my concerns so we can figure out cleaner, simpler ways of achieving the same goal. This is a great feature, and it will be a shame if it ends up being understandable, extendable, or usable by only a few of us 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants