Skip to content

Commit 35bba3d

Browse files
committed
[FLINK-37610] Check if orderBy keys exists before accessing
- If ORDER BY clause is not specified in an Over agg, it would lead to IndexOutOfBoundsException - This commit adds a check to ensure orderby fields exists before accessing them
1 parent 5aaff07 commit 35bba3d

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,18 +310,26 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
310310
.newBuilder()
311311
.addContainedKind(ModifyKind.INSERT)
312312

313-
// All aggregates are computed over the same window and order by is supported for only 1 field
314-
val orderKeyIndex =
315-
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
316-
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
317313
if (
318-
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
319-
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
314+
!over.logicWindow.groups.isEmpty && !over.logicWindow.groups
315+
.get(0)
316+
.orderKeys
317+
.getFieldCollations
318+
.isEmpty
320319
) {
321-
// Only non row-time/proc-time sort can support UPDATES
322-
builder.addContainedKind(ModifyKind.UPDATE)
323-
builder.addContainedKind(ModifyKind.DELETE)
324-
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
320+
// All aggregates are computed over the same window and order by is supported for only 1 field
321+
val orderKeyIndex =
322+
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
323+
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
324+
if (
325+
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
326+
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
327+
) {
328+
// Only non row-time/proc-time sort can support UPDATES
329+
builder.addContainedKind(ModifyKind.UPDATE)
330+
builder.addContainedKind(ModifyKind.DELETE)
331+
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
332+
}
325333
}
326334
val children = visitChildren(over, overRequiredTrait)
327335
val providedTrait = new ModifyKindSetTrait(builder.build())

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,4 +526,27 @@ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2])
526526
]]>
527527
</Resource>
528528
</TestCase>
529+
<TestCase name="testWithoutOrderByClause">
530+
<Resource name="sql">
531+
<![CDATA[
532+
SELECT c,
533+
COUNT(a) OVER (PARTITION BY c) AS cnt1
534+
FROM MyTable
535+
]]>
536+
</Resource>
537+
<Resource name="ast">
538+
<![CDATA[
539+
LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2)])
540+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
541+
]]>
542+
</Resource>
543+
<Resource name="optimized exec plan">
544+
<![CDATA[
545+
Calc(select=[c, w0$o0 AS $1])
546+
+- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, COUNT(a) AS w0$o0])
547+
+- Exchange(distribution=[hash[c]])
548+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
549+
]]>
550+
</Resource>
551+
</TestCase>
529552
</Root>

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,4 +467,17 @@ class OverAggregateTest extends TableTestBase {
467467
|)
468468
|""".stripMargin)
469469
}
470+
471+
@Test
472+
def testWithoutOrderByClause(): Unit = {
473+
val sql =
474+
"""
475+
|SELECT c,
476+
| COUNT(a) OVER (PARTITION BY c) AS cnt1
477+
|FROM MyTable
478+
""".stripMargin
479+
480+
util.verifyExecPlan(sql)
481+
}
482+
470483
}

0 commit comments

Comments
 (0)