Skip to content

Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule #10061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Apr 15, 2024

Conversation

jayzhan211
Copy link
Contributor

@jayzhan211 jayzhan211 commented Apr 12, 2024

Which issue does this PR close?

Closes #9972.

Rationale for this change

We plan to make FIRST / LAST UDAF. This rule does the conversion between FIRST/LAST, it will eventually be moved to aggregate-functions crate. The first step is to move it out to an independent rule.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Apr 12, 2024
&input_order_mode,
);

let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the logic similar to AggregateExec::try_new_with_schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        Ok(AggregateExec {
            mode,
            group_by,
            aggr_expr,
            filter_expr,
            input,
            schema,
            input_schema,
            metrics: ExecutionPlanMetricsSet::new(),
            required_input_ordering,
            limit: None,
            input_order_mode,
            cache,
        })

@ozankabak
Copy link
Contributor

We will review and comment on this next week. @mustafasrepo

Signed-off-by: jayzhan211 <[email protected]>
///
/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but this
/// function care only the possible conversion between FIRST_VALUE and LAST_VALUE
fn get_aggregate_exprs_requirement(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only first/last rule is moved to here.

Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Signed-off-by: jayzhan211 <[email protected]>
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jayzhan211 -- this is a very nice contribution towards extracting aggregate functions out of the core. I think we should fix the double recursion but otherwise this code looks (really) nice to me.

cc @mustafasrepo and @ozankabak FYI

/// so we can convert the aggregate expression to FirstValue(c1 order by asc),
/// since the current ordering is already satisfied, it saves our time!
#[derive(Default)]
pub struct ConvertFirstLast {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could call this something more general, like OptimizeAggregateOrder so it could potentially be used for aggregates other than FIRST_VALUE and LAST_VALUE 🤔

}

fn name(&self) -> &str {
"SimpleOrdering"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name should match the name of the structure -- that is "ConvertFirstLast" in this case

fn get_common_requirement_of_aggregate_input(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// Optimize children
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this rule already calls transform_up which handles the recursion up the tree of ExecutionPlan and managine the transformedflag, I don't think you also need to recursively walk down the children here again. I think you can probably just call optimize_internal directly

Recursing back down the tree is also like N^2 (or worse) in the number of plan nodes so I think we should avoid it for performance reasons (in addition to making the code simpler)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing, it works! I think I have no idea what is going on in transform_up 😞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    /// Convenience utility for writing optimizer rules: Recursively apply the
    /// given function `f` to all children of a node, and then to the node itself
    /// (post-order traversal). When `f` does not apply to a given node, it is
    /// left unchanged.

I didn't notice that the children is updated to parent too, so I do it manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$SELF.map_children($F_CHILD)?.transform_parent(|n| $F_UP(n))

I think transform_parent here is actually doing transform_self 🤔

}
}

/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested AggregateExec where the first layer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for this comment. It makes things much clearer

let reverse_aggr_req =
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);

if let Some(first_value) = aggr_expr.as_any().downcast_ref::<FirstValue>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually (some other PR) it would be amazing if we can move this code into FirstValue somehow. As it is now, there is a coupling between the optimizer rule and the actual PhysicalExpr -- which means among other things this same optimization can't be used by user defined aggregates

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be similar to FunctionRewrite, registerable optimize rule.

@ozankabak
Copy link
Contributor

Thank you @jayzhan211 -- this is a very nice contribution towards extracting aggregate functions out of the core. I think we should fix the double recursion but otherwise this code looks (really) nice to me.

cc @mustafasrepo and @ozankabak FYI

We will review this Monday

@alamb
Copy link
Contributor

alamb commented Apr 13, 2024

Sounds good -- thank you @ozankabak -- let's wait for that review prior to merging this PR

@@ -89,6 +90,8 @@ impl PhysicalOptimizer {
// as that rule may inject other operations in between the different AggregateExecs.
// Applying the rule early means only directly-connected AggregateExecs must be examined.
Arc::new(LimitedDistinctAggregation::new()),
// Run once before PartialFinalAggregation is rewritten to ensure the rule is applied correctly
Arc::new(OptimizeAggregateOrder::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this rule from here. Using it only in below place should be enough.

Copy link
Contributor

@mustafasrepo mustafasrepo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jayzhan211 for this PR. I have left some minor comment. Please note that, applying my suggestion changes couple of tests. However, I have verified that those changes are both valid, not harmful for the execution. We can merge this PR as is also.

Signed-off-by: jayzhan211 <[email protected]>
@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 15, 2024

Arc::new(OptimizeAggregateOrder::new()),

Let me apply your suggestion! Thanks for your review @mustafasrepo and @alamb

@jayzhan211 jayzhan211 merged commit 4e9f2d5 into apache:main Apr 15, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule
4 participants