Skip to content

Commit 19dd46d

Browse files
Blizzaraalamb
andauthored
fix: handle duplicate WindowFunction expressions in Substrait consumer (#15211)
* add a failing testcase having two times the same windowfunction expr fails planning with `Error: SchemaError(DuplicateUnqualifiedField { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" }, Some(""))` * deduplicate window expressions in substrait consumer * add test case for distinct windows in single project * fix consumer to handle different windows * make group_window_expr_by_sort_keys take intoiterator * fix clippy * add a note about ordering --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 8e2bfa4 commit 19dd46d

File tree

6 files changed

+344
-6
lines changed

6 files changed

+344
-6
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ impl LogicalPlanBuilder {
514514
/// Wrap a plan in a window
515515
pub fn window_plan(
516516
input: LogicalPlan,
517-
window_exprs: Vec<Expr>,
517+
window_exprs: impl IntoIterator<Item = Expr>,
518518
) -> Result<LogicalPlan> {
519519
let mut plan = input;
520520
let mut groups = group_window_expr_by_sort_keys(window_exprs)?;

datafusion/expr/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ pub fn compare_sort_expr(
575575

576576
/// Group a slice of window expression expr by their order by expressions
577577
pub fn group_window_expr_by_sort_keys(
578-
window_expr: Vec<Expr>,
578+
window_expr: impl IntoIterator<Item = Expr>,
579579
) -> Result<Vec<(WindowSortKey, Vec<Expr>)>> {
580580
let mut result = vec![];
581581
window_expr.into_iter().try_for_each(|expr| match &expr {

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@ pub async fn from_project_rel(
10591059
p: &ProjectRel,
10601060
) -> Result<LogicalPlan> {
10611061
if let Some(input) = p.input.as_ref() {
1062-
let mut input = LogicalPlanBuilder::from(consumer.consume_rel(input).await?);
1062+
let input = consumer.consume_rel(input).await?;
10631063
let original_schema = Arc::clone(input.schema());
10641064

10651065
// Ensure that all expressions have a unique display name, so that
@@ -1075,6 +1075,10 @@ pub async fn from_project_rel(
10751075
// leaving only explicit expressions.
10761076

10771077
let mut explicit_exprs: Vec<Expr> = vec![];
1078+
// For WindowFunctions, we need to wrap them in a Window relation. If there are duplicates,
1079+
// we can do the window'ing only once, then the project will duplicate the result.
1080+
// Order here doesn't matter since LPB::window_plan sorts the expressions.
1081+
let mut window_exprs: HashSet<Expr> = HashSet::new();
10781082
for expr in &p.expressions {
10791083
let e = consumer
10801084
.consume_expression(expr, input.clone().schema())
@@ -1084,18 +1088,24 @@ pub async fn from_project_rel(
10841088
// Adding the same expression here and in the project below
10851089
// works because the project's builder uses columnize_expr(..)
10861090
// to transform it into a column reference
1087-
input = input.window(vec![e.clone()])?
1091+
window_exprs.insert(e.clone());
10881092
}
10891093
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
10901094
}
10911095

1096+
let input = if !window_exprs.is_empty() {
1097+
LogicalPlanBuilder::window_plan(input, window_exprs)?
1098+
} else {
1099+
input
1100+
};
1101+
10921102
let mut final_exprs: Vec<Expr> = vec![];
10931103
for index in 0..original_schema.fields().len() {
10941104
let e = Expr::Column(Column::from(original_schema.qualified_field(index)));
10951105
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
10961106
}
10971107
final_exprs.append(&mut explicit_exprs);
1098-
input.project(final_exprs)?.build()
1108+
project(input, final_exprs)
10991109
} else {
11001110
not_impl_err!("Projection without an input is not supported")
11011111
}

datafusion/substrait/tests/cases/logical_plans.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ mod tests {
4545
"Projection: NOT DATA.D AS EXPR$0\
4646
\n TableScan: DATA"
4747
);
48+
49+
// Trigger execution to ensure plan validity
50+
DataFrame::new(ctx.state(), plan).show().await?;
51+
4852
Ok(())
4953
}
5054

@@ -71,6 +75,63 @@ mod tests {
7175
\n WindowAggr: windowExpr=[[sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]\
7276
\n TableScan: DATA"
7377
);
78+
79+
// Trigger execution to ensure plan validity
80+
DataFrame::new(ctx.state(), plan).show().await?;
81+
82+
Ok(())
83+
}
84+
85+
#[tokio::test]
86+
async fn double_window_function() -> Result<()> {
87+
// Confirms a WindowExpr can be repeated in the same project.
88+
// This wouldn't normally happen with DF-created plans since CSE would eliminate the duplicate.
89+
90+
// File generated with substrait-java's Isthmus:
91+
// ./isthmus-cli/build/graal/isthmus --create "create table data (a int)" "select ROW_NUMBER() OVER (), ROW_NUMBER() OVER () AS aliased from data";
92+
let proto_plan =
93+
read_json("tests/testdata/test_plans/double_window.substrait.json");
94+
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
95+
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
96+
97+
assert_eq!(
98+
format!("{}", plan),
99+
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW__temp__0 AS ALIASED\
100+
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
101+
\n TableScan: DATA"
102+
);
103+
104+
// Trigger execution to ensure plan validity
105+
DataFrame::new(ctx.state(), plan).show().await?;
106+
107+
Ok(())
108+
}
109+
110+
#[tokio::test]
111+
async fn double_window_function_distinct_windows() -> Result<()> {
112+
// Confirms a single project can have multiple window functions with separate windows in it.
113+
// This wouldn't normally happen with DF-created plans since logical optimizer would
114+
// separate them out.
115+
116+
// File generated with substrait-java's Isthmus:
117+
// ./isthmus-cli/build/graal/isthmus --create "create table data (a int)" "select ROW_NUMBER() OVER (), ROW_NUMBER() OVER (PARTITION BY a) from data";
118+
let proto_plan = read_json(
119+
"tests/testdata/test_plans/double_window_distinct_windows.substrait.json",
120+
);
121+
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
122+
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
123+
124+
assert_eq!(
125+
format!("{}", plan),
126+
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$1\
127+
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
128+
\n WindowAggr: windowExpr=[[row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
129+
\n TableScan: DATA"
130+
);
131+
132+
// Trigger execution to ensure plan validity
133+
DataFrame::new(ctx.state(), plan).show().await?;
134+
74135
Ok(())
75136
}
76137

@@ -86,7 +147,7 @@ mod tests {
86147

87148
assert_eq!(format!("{}", &plan), "Values: (List([1, 2]))");
88149

89-
// Need to trigger execution to ensure that Arrow has validated the plan
150+
// Trigger execution to ensure plan validity
90151
DataFrame::new(ctx.state(), plan).show().await?;
91152

92153
Ok(())
@@ -107,6 +168,9 @@ mod tests {
107168
\n TableScan: sales"
108169
);
109170

171+
// Trigger execution to ensure plan validity
172+
DataFrame::new(ctx.state(), plan).show().await?;
173+
110174
Ok(())
111175
}
112176
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
{
2+
"extensionUris": [
3+
{
4+
"extensionUriAnchor": 1,
5+
"uri": "/functions_arithmetic.yaml"
6+
}
7+
],
8+
"extensions": [
9+
{
10+
"extensionFunction": {
11+
"extensionUriReference": 1,
12+
"functionAnchor": 0,
13+
"name": "row_number:"
14+
}
15+
}
16+
],
17+
"relations": [
18+
{
19+
"root": {
20+
"input": {
21+
"project": {
22+
"common": {
23+
"emit": {
24+
"outputMapping": [
25+
1,
26+
2
27+
]
28+
}
29+
},
30+
"input": {
31+
"read": {
32+
"common": {
33+
"direct": {
34+
}
35+
},
36+
"baseSchema": {
37+
"names": [
38+
"A"
39+
],
40+
"struct": {
41+
"types": [
42+
{
43+
"i32": {
44+
"typeVariationReference": 0,
45+
"nullability": "NULLABILITY_NULLABLE"
46+
}
47+
}
48+
],
49+
"typeVariationReference": 0,
50+
"nullability": "NULLABILITY_REQUIRED"
51+
}
52+
},
53+
"namedTable": {
54+
"names": [
55+
"DATA"
56+
]
57+
}
58+
}
59+
},
60+
"expressions": [
61+
{
62+
"windowFunction": {
63+
"functionReference": 0,
64+
"partitions": [],
65+
"sorts": [],
66+
"upperBound": {
67+
"currentRow": {
68+
}
69+
},
70+
"lowerBound": {
71+
"unbounded": {
72+
}
73+
},
74+
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
75+
"outputType": {
76+
"i64": {
77+
"typeVariationReference": 0,
78+
"nullability": "NULLABILITY_REQUIRED"
79+
}
80+
},
81+
"args": [],
82+
"arguments": [],
83+
"invocation": "AGGREGATION_INVOCATION_ALL",
84+
"options": [],
85+
"boundsType": "BOUNDS_TYPE_ROWS"
86+
}
87+
},
88+
{
89+
"windowFunction": {
90+
"functionReference": 0,
91+
"partitions": [],
92+
"sorts": [],
93+
"upperBound": {
94+
"currentRow": {
95+
}
96+
},
97+
"lowerBound": {
98+
"unbounded": {
99+
}
100+
},
101+
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
102+
"outputType": {
103+
"i64": {
104+
"typeVariationReference": 0,
105+
"nullability": "NULLABILITY_REQUIRED"
106+
}
107+
},
108+
"args": [],
109+
"arguments": [],
110+
"invocation": "AGGREGATION_INVOCATION_ALL",
111+
"options": [],
112+
"boundsType": "BOUNDS_TYPE_ROWS"
113+
}
114+
}
115+
]
116+
}
117+
},
118+
"names": [
119+
"EXPR$0",
120+
"ALIASED"
121+
]
122+
}
123+
}
124+
],
125+
"expectedTypeUrls": []
126+
}

0 commit comments

Comments
 (0)