Skip to content

Commit b17e271

Browse files
authored
improve push down filter aggregate (#13071)
1 parent c228a20 commit b17e271

24 files changed

+426
-159
lines changed

src/query/service/tests/it/sql/planner/format/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ fn test_format() {
165165
}
166166
.into(),
167167
],
168-
is_having: false,
169168
}
170169
.into(),
171170
),

src/query/sql/src/planner/binder/delete.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ impl Binder {
129129

130130
let filter = Filter {
131131
predicates: vec![scalar.clone()],
132-
is_having: false,
133132
};
134133
debug_assert_eq!(table_expr.plan.rel_op(), RelOp::Scan);
135134
let mut scan = match &*table_expr.plan {

src/query/sql/src/planner/binder/having.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,7 @@ impl Binder {
8888

8989
let predicates = split_conjunctions(&scalar);
9090

91-
let filter = Filter {
92-
predicates,
93-
is_having: true,
94-
};
91+
let filter = Filter { predicates };
9592

9693
Ok(SExpr::create_unary(
9794
Arc::new(filter.into()),

src/query/sql/src/planner/binder/join.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ impl Binder {
262262
Arc::new(
263263
Filter {
264264
predicates: left_push_down,
265-
is_having: false,
266265
}
267266
.into(),
268267
),
@@ -275,7 +274,6 @@ impl Binder {
275274
Arc::new(
276275
Filter {
277276
predicates: right_push_down,
278-
is_having: false,
279277
}
280278
.into(),
281279
),

src/query/sql/src/planner/binder/select.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ impl Binder {
441441

442442
let filter_plan = Filter {
443443
predicates: split_conjunctions(&scalar),
444-
is_having: false,
445444
};
446445
let new_expr = SExpr::create_unary(Arc::new(filter_plan.into()), Arc::new(child));
447446
bind_context.set_expr_context(last_expr_context);

src/query/sql/src/planner/format/display_plan.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,7 @@ fn format_delete(delete: &DeletePlan) -> Result<String> {
235235
if let Some(selection) = &delete.selection {
236236
predicates.push(selection.clone());
237237
}
238-
let filter = RelOperator::Filter(Filter {
239-
predicates,
240-
is_having: false,
241-
});
238+
let filter = RelOperator::Filter(Filter { predicates });
242239
SExpr::create_unary(Arc::new(filter), Arc::new(scan_expr))
243240
};
244241
let res = s_expr.to_format_tree(&delete.metadata).format_pretty()?;

src/query/sql/src/planner/optimizer/heuristic/decorrelate.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ impl SubqueryRewriter {
245245
Arc::new(
246246
Filter {
247247
predicates: left_filters,
248-
is_having: false,
249248
}
250249
.into(),
251250
),
@@ -262,7 +261,6 @@ impl SubqueryRewriter {
262261
Arc::new(
263262
Filter {
264263
predicates: right_filters,
265-
is_having: false,
266264
}
267265
.into(),
268266
),
@@ -577,11 +575,7 @@ impl SubqueryRewriter {
577575
predicates.push(self.flatten_scalar(predicate, correlated_columns)?);
578576
}
579577

580-
let filter_plan = Filter {
581-
predicates,
582-
is_having: filter.is_having,
583-
}
584-
.into();
578+
let filter_plan = Filter { predicates }.into();
585579
Ok(SExpr::create_unary(
586580
Arc::new(filter_plan),
587581
Arc::new(flatten_plan),

src/query/sql/src/planner/optimizer/heuristic/subquery_rewriter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,6 @@ impl SubqueryRewriter {
439439
};
440440
let filter = Filter {
441441
predicates: vec![compare.into()],
442-
is_having: false,
443442
};
444443

445444
// Filter: COUNT(*) = 1 or COUNT(*) != 1

src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ impl DPhpy {
185185
if !op.non_equi_conditions.is_empty() {
186186
let filter = Filter {
187187
predicates: op.non_equi_conditions.clone(),
188-
is_having: false,
189188
};
190189
self.filters.insert(filter);
191190
}
@@ -584,10 +583,7 @@ impl DPhpy {
584583
predicates.extend(filter.clone().predicates.iter().cloned())
585584
}
586585
new_s_expr = SExpr::create_unary(
587-
Arc::new(RelOperator::Filter(Filter {
588-
predicates,
589-
is_having: false,
590-
})),
586+
Arc::new(RelOperator::Filter(Filter { predicates })),
591587
Arc::new(new_s_expr),
592588
);
593589
new_s_expr = self.push_down_filter(&new_s_expr)?;

src/query/sql/src/planner/optimizer/rule/rewrite/filter_join/derive_filter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ pub fn try_derive_predicates(
8484
Arc::new(
8585
Filter {
8686
predicates: left_push_down,
87-
is_having: false,
8887
}
8988
.into(),
9089
),
@@ -97,7 +96,6 @@ pub fn try_derive_predicates(
9796
Arc::new(
9897
Filter {
9998
predicates: right_push_down,
100-
is_having: false,
10199
}
102100
.into(),
103101
),

src/query/sql/src/planner/optimizer/rule/rewrite/rule_eliminate_filter.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,7 @@ impl Rule for RuleEliminateFilter {
9494
if predicates.is_empty() {
9595
state.add_result(s_expr.child(0)?.clone());
9696
} else if origin_predicates.len() != predicates.len() {
97-
let filter = Filter {
98-
predicates,
99-
is_having: eval_scalar.is_having,
100-
};
97+
let filter = Filter { predicates };
10198
state.add_result(SExpr::create_unary(
10299
Arc::new(filter.into()),
103100
Arc::new(s_expr.child(0)?.clone()),

src/query/sql/src/planner/optimizer/rule/rewrite/rule_infer_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,6 @@ impl Rule for RuleInferFilter {
600600
Arc::new(
601601
Filter {
602602
predicates: new_predicates,
603-
is_having: filter.is_having,
604603
}
605604
.into(),
606605
),

src/query/sql/src/planner/optimizer/rule/rewrite/rule_merge_filter.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,7 @@ impl Rule for RuleMergeFilter {
7979
.into_iter()
8080
.chain(down_filter.predicates.into_iter())
8181
.collect();
82-
let merged = Filter {
83-
predicates,
84-
is_having: false,
85-
};
82+
let merged = Filter { predicates };
8683

8784
let new_expr = SExpr::create_unary(
8885
Arc::new(merged.into()),

src/query/sql/src/planner/optimizer/rule/rewrite/rule_normalize_disjunctive_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ impl Rule for RuleNormalizeDisjunctiveFilter {
178178
Arc::new(
179179
Filter {
180180
predicates: split_predicates,
181-
is_having: filter.is_having,
182181
}
183182
.into(),
184183
),

src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_aggregate.rs

Lines changed: 61 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,36 @@ use std::sync::Arc;
1616

1717
use crate::optimizer::rule::Rule;
1818
use crate::optimizer::rule::TransformResult;
19+
use crate::optimizer::RelExpr;
1920
use crate::optimizer::RuleID;
2021
use crate::optimizer::SExpr;
2122
use crate::plans::Aggregate;
22-
use crate::plans::AggregateMode;
2323
use crate::plans::Filter;
2424
use crate::plans::PatternPlan;
2525
use crate::plans::RelOp;
2626
use crate::plans::RelOp::Pattern;
27-
use crate::plans::RelOperator;
2827

29-
/// Heuristic optimizer runs in a bottom-up recursion fashion. If we match a plan like
30-
/// Filter-Aggregate-* and push down filter to Filter(Optional)-Aggregate-Filter-*, this will not
31-
/// work. RuleSplitAggregate will be applied first, since it's bottom up, then this rule, which
32-
/// cause the plan be like Filter(Optional)-Aggregate-Filter-Aggregate-*, which makes no sense.
33-
/// Hence we match 2 bundled Aggregate Ops:
28+
/// Input: Filter
29+
/// \
30+
/// Aggregate(Final or Partial)
31+
/// \
32+
/// *
3433
///
35-
/// Input: Filter
34+
/// Output:
35+
/// (1) Aggregate(Final or Partial)
3636
/// \
37-
/// Aggregate(Final)
38-
/// \
39-
/// Aggregate(Partial)
37+
/// Filter
4038
/// \
4139
/// *
4240
///
43-
/// Output: Filter(Optional)
41+
/// (2)
42+
/// Filter(remaining)
4443
/// \
45-
/// Aggregate(Final)
44+
/// Aggregate(Final or Partial)
4645
/// \
47-
/// Aggregate(Partial)
46+
/// Filter(pushed down)
4847
/// \
49-
/// Filter
50-
/// \
51-
/// *
48+
/// *
5249
pub struct RulePushDownFilterAggregate {
5350
id: RuleID,
5451
patterns: Vec<SExpr>,
@@ -72,17 +69,9 @@ impl RulePushDownFilterAggregate {
7269
}
7370
.into(),
7471
),
75-
Arc::new(SExpr::create_unary(
76-
Arc::new(
77-
PatternPlan {
78-
plan_type: RelOp::Aggregate,
79-
}
80-
.into(),
81-
),
82-
Arc::new(SExpr::create_leaf(Arc::new(
83-
PatternPlan { plan_type: Pattern }.into(),
84-
))),
85-
)),
72+
Arc::new(SExpr::create_leaf(Arc::new(
73+
PatternPlan { plan_type: Pattern }.into(),
74+
))),
8675
)),
8776
)],
8877
}
@@ -96,68 +85,54 @@ impl Rule for RulePushDownFilterAggregate {
9685

9786
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> common_exception::Result<()> {
9887
let filter: Filter = s_expr.plan().clone().try_into()?;
99-
if filter.is_having {
100-
let agg_parent = s_expr.child(0)?;
101-
let agg_parent_plan: Aggregate = agg_parent.plan().clone().try_into()?;
102-
let agg_child = agg_parent.child(0)?;
103-
let agg_child_plan: Aggregate = agg_child.plan().clone().try_into()?;
104-
if agg_parent_plan.mode == AggregateMode::Final
105-
&& agg_child_plan.mode == AggregateMode::Partial
88+
let aggregate_expr = s_expr.child(0)?;
89+
let aggregate: Aggregate = aggregate_expr.plan().clone().try_into()?;
90+
let aggregate_child_prop =
91+
RelExpr::with_s_expr(aggregate_expr).derive_relational_prop_child(0)?;
92+
let aggregate_group_columns = aggregate.group_columns()?;
93+
let mut pushed_down_predicates = vec![];
94+
let mut remaining_predicates = vec![];
95+
for predicate in filter.predicates.into_iter() {
96+
let predicate_used_columns = predicate.used_columns();
97+
if predicate_used_columns.is_subset(&aggregate_child_prop.output_columns)
98+
&& predicate_used_columns.is_subset(&aggregate_group_columns)
10699
{
107-
let mut push_predicates = vec![];
108-
let mut remaining_predicates = vec![];
109-
for predicate in filter.predicates {
110-
let used_columns = predicate.used_columns();
111-
let mut pushable = true;
112-
for col in used_columns {
113-
if !agg_parent_plan.group_columns()?.contains(&col) {
114-
pushable = false;
115-
break;
116-
}
117-
}
118-
if pushable {
119-
push_predicates.push(predicate);
120-
} else {
121-
remaining_predicates.push(predicate);
122-
}
123-
}
124-
let mut result: SExpr;
125-
// No change since nothing can be pushed down.
126-
if push_predicates.is_empty() {
127-
result = s_expr.clone();
128-
} else {
129-
let filter_push_down_expr = SExpr::create_unary(
130-
Arc::new(RelOperator::Filter(Filter {
131-
predicates: push_predicates,
132-
is_having: false,
133-
})),
134-
Arc::new(agg_child.child(0)?.clone()),
135-
);
136-
let agg_with_filter_push_down_expr = SExpr::create_unary(
137-
Arc::new(RelOperator::Aggregate(agg_parent_plan)),
100+
pushed_down_predicates.push(predicate);
101+
} else {
102+
remaining_predicates.push(predicate)
103+
}
104+
}
105+
if !pushed_down_predicates.is_empty() {
106+
let pushed_down_filter = Filter {
107+
predicates: pushed_down_predicates,
108+
};
109+
let mut result = if remaining_predicates.is_empty() {
110+
SExpr::create_unary(
111+
Arc::new(aggregate.into()),
112+
Arc::new(SExpr::create_unary(
113+
Arc::new(pushed_down_filter.into()),
114+
Arc::new(aggregate_expr.child(0)?.clone()),
115+
)),
116+
)
117+
} else {
118+
let remaining_filter = Filter {
119+
predicates: remaining_predicates,
120+
};
121+
SExpr::create_unary(
122+
Arc::new(remaining_filter.into()),
123+
Arc::new(SExpr::create_unary(
124+
Arc::new(aggregate.into()),
138125
Arc::new(SExpr::create_unary(
139-
Arc::new(RelOperator::Aggregate(agg_child_plan)),
140-
Arc::new(filter_push_down_expr),
126+
Arc::new(pushed_down_filter.into()),
127+
Arc::new(aggregate_expr.child(0)?.clone()),
141128
)),
142-
);
143-
// All filters are pushed down.
144-
if remaining_predicates.is_empty() {
145-
result = agg_with_filter_push_down_expr;
146-
} else {
147-
// Partial filter can be pushed down.
148-
result = SExpr::create_unary(
149-
Arc::new(RelOperator::Filter(Filter {
150-
predicates: remaining_predicates,
151-
is_having: true,
152-
})),
153-
Arc::new(agg_with_filter_push_down_expr),
154-
);
155-
}
156-
}
157-
result.set_applied_rule(&self.id);
158-
state.add_result(result);
159-
}
129+
)),
130+
)
131+
};
132+
result.set_applied_rule(&self.id);
133+
state.add_result(result);
160134
}
135+
161136
Ok(())
162137
}
163138

src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_eval_scalar.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use common_exception::ErrorCode;
1817
use common_exception::Result;
1918

2019
use crate::optimizer::rule::Rule;
@@ -91,10 +90,7 @@ impl RulePushDownFilterEvalScalar {
9190
return Ok(item.scalar.clone());
9291
}
9392
}
94-
Err(ErrorCode::UnknownColumn(format!(
95-
"Cannot find column to replace `{}`(#{})",
96-
column.column.column_name, column.column.index
97-
)))
93+
Ok(predicate.clone())
9894
}
9995
ScalarExpr::WindowFunction(window) => {
10096
let func = match &window.func {

src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ pub fn try_push_down_filter_join(
196196
Arc::new(
197197
Filter {
198198
predicates: original_predicates,
199-
is_having: false,
200199
}
201200
.into(),
202201
),

0 commit comments

Comments
 (0)