Skip to content

Commit fe7e14b

Browse files
committed
[FLINK-37543][table-planner] Support sink reuse in batch mode
1 parent b892cb6 commit fe7e14b

File tree

9 files changed

+918
-2
lines changed

9 files changed

+918
-2
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java

+11
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ public class OptimizerConfigOptions {
105105
+ TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
106106
+ " is true.");
107107

108+
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
109+
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
110+
key("table.optimizer.reuse-sink-enabled")
111+
.booleanType()
112+
.defaultValue(false)
113+
.withDescription(
114+
"When it is true, the optimizer will try to find out duplicated table sinks and "
115+
+ "reuse them. This works only when "
116+
+ TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
117+
+ " is true.");
118+
108119
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
109120
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED =
110121
key("table.optimizer.source.report-statistics-enabled")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.reuse;
20+
21+
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
22+
import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
23+
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
24+
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
25+
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
26+
import org.apache.flink.util.Preconditions;
27+
28+
import org.apache.calcite.plan.RelTraitSet;
29+
import org.apache.calcite.rel.RelNode;
30+
import org.apache.calcite.rel.core.Union;
31+
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.Collections;
35+
import java.util.IdentityHashMap;
36+
import java.util.List;
37+
import java.util.Set;
38+
import java.util.stream.Collectors;
39+
40+
/**
41+
* This checks each sink node to see if it can be reused with another sink node. If so, we will
42+
* reuse all reusable sink to one instance. This only used in the STATEMENT SET clause with multiple
43+
* INSERT INTO.
44+
*
45+
* <p>Examples in SQL look like:
46+
*
47+
* <pre>{@code
48+
* BEGIN STATEMENT SET;
49+
* INSERT INTO sink1 SELECT * FROM source1;
50+
* INSERT INTO sink1 SELECT * FROM source2;
51+
* INSERT INTO sink2 SELECT * FROM source3;
52+
* END;
53+
* }</pre>
54+
*
55+
* <p>The plan is as follows:
56+
*
57+
* <pre>{@code
58+
* TableScan1 —— Sink1
59+
* TableScan2 —— Sink1
60+
* TableScan3 —— Sink2
61+
* }</pre>
62+
*
63+
* <p>After reused, the plan will be changed as follows:
64+
*
65+
* <pre>{@code
66+
* TableScan1 --\
67+
* Union -- Sink1
68+
* TableScan2 --/
69+
*
70+
* TableScan3 —— Sink3
71+
* }</pre>
72+
*/
73+
public class SinkReuser {
74+
75+
public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
76+
// Find all sinks
77+
List<Sink> allSinkNodes =
78+
relNodes.stream()
79+
.filter(node -> node instanceof Sink)
80+
.map(node -> (Sink) node)
81+
.collect(Collectors.toList());
82+
List<ReusableSinkGroup> reusableSinkGroups = groupReusableSink(allSinkNodes);
83+
84+
Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
85+
86+
// Remove all unused sink nodes
87+
return relNodes.stream()
88+
.filter(root -> !(root instanceof Sink) || reusedSinkNodes.contains(root))
89+
.collect(Collectors.toList());
90+
}
91+
92+
private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> reusableSinkGroups) {
93+
final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new IdentityHashMap<>());
94+
reusableSinkGroups.forEach(
95+
group -> {
96+
List<Sink> originalSinks = group.originalSinks;
97+
if (originalSinks.size() <= 1) {
98+
Preconditions.checkState(originalSinks.size() == 1);
99+
reusedSinkNodes.add(originalSinks.get(0));
100+
return;
101+
}
102+
List<RelNode> allSinkInputs = new ArrayList<>();
103+
for (Sink sinkNode : originalSinks) {
104+
allSinkInputs.add(sinkNode.getInput());
105+
}
106+
// Use the first sink node as the final reused sink node
107+
Sink reusedSink = originalSinks.get(0);
108+
109+
Union unionForReusedSinks;
110+
111+
unionForReusedSinks =
112+
new BatchPhysicalUnion(
113+
reusedSink.getCluster(),
114+
group.inputTraitSet,
115+
allSinkInputs,
116+
true,
117+
// use sink input row type
118+
reusedSink.getRowType());
119+
120+
reusedSink.replaceInput(0, unionForReusedSinks);
121+
reusedSinkNodes.add(reusedSink);
122+
});
123+
return reusedSinkNodes;
124+
}
125+
126+
/**
127+
* Grouping sinks that can be reused with each other.
128+
*
129+
* @param allSinkNodes in the plan.
130+
* @return a list contains all grouped sink.
131+
*/
132+
private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) {
133+
List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
134+
135+
for (Sink currentSinkNode : allSinkNodes) {
136+
boolean canBeReused = false;
137+
String currentSinkDigest = getDigest(currentSinkNode);
138+
SinkAbilitySpec[] currentSinkSpecs;
139+
140+
currentSinkSpecs = ((BatchPhysicalSink) currentSinkNode).abilitySpecs();
141+
RelTraitSet currentInputTraitSet = currentSinkNode.getInput().getTraitSet();
142+
for (ReusableSinkGroup group : reusableSinkGroups) {
143+
// Only table sink with the same digest, specs and input trait set can be reused
144+
if (!(group.digest.equals(currentSinkDigest)
145+
&& Arrays.equals(group.sinkAbilitySpecs, currentSinkSpecs)
146+
&& group.inputTraitSet.equals(currentInputTraitSet))) {
147+
continue;
148+
}
149+
150+
canBeReused = true;
151+
group.originalSinks.add(currentSinkNode);
152+
break;
153+
}
154+
155+
if (!canBeReused) {
156+
reusableSinkGroups.add(
157+
new ReusableSinkGroup(
158+
currentSinkNode,
159+
currentSinkDigest,
160+
currentSinkSpecs,
161+
currentInputTraitSet));
162+
}
163+
}
164+
return reusableSinkGroups;
165+
}
166+
167+
public String getDigest(Sink sink) {
168+
List<String> digest = new ArrayList<>();
169+
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
170+
171+
int[][] targetColumns = sink.targetColumns();
172+
if (targetColumns != null && targetColumns.length > 0) {
173+
digest.add(
174+
"targetColumns=["
175+
+ Arrays.stream(targetColumns)
176+
.map(Arrays::toString)
177+
.collect(Collectors.joining(","))
178+
+ "]");
179+
}
180+
181+
String fields =
182+
sink.getRowType().getFieldList().stream()
183+
.map(f -> f.getType().toString())
184+
.collect(Collectors.joining(", "));
185+
digest.add("fields=[" + fields + "]");
186+
if (!sink.hints().isEmpty()) {
187+
digest.add("hints=" + RelExplainUtil.hintsToString(sink.hints()));
188+
}
189+
190+
return digest.toString();
191+
}
192+
193+
private static class ReusableSinkGroup {
194+
private final List<Sink> originalSinks = new ArrayList<>();
195+
196+
private final SinkAbilitySpec[] sinkAbilitySpecs;
197+
198+
private final RelTraitSet inputTraitSet;
199+
200+
private final String digest;
201+
202+
ReusableSinkGroup(
203+
Sink sink,
204+
String digest,
205+
SinkAbilitySpec[] sinkAbilitySpecs,
206+
RelTraitSet inputTraitSet) {
207+
this.originalSinks.add(sink);
208+
this.sinkAbilitySpecs = sinkAbilitySpecs;
209+
this.inputTraitSet = inputTraitSet;
210+
this.digest = digest;
211+
}
212+
}
213+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ abstract class PlannerBase(
393393
@VisibleForTesting
394394
private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = {
395395
val optimizedRelNodes = getOptimizer.optimize(relNodes)
396-
require(optimizedRelNodes.size == relNodes.size)
396+
require(optimizedRelNodes.size <= relNodes.size)
397397
optimizedRelNodes
398398
}
399399

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class BatchPhysicalSink(
4242
contextResolvedTable: ContextResolvedTable,
4343
tableSink: DynamicTableSink,
4444
targetColumns: Array[Array[Int]],
45-
abilitySpecs: Array[SinkAbilitySpec])
45+
val abilitySpecs: Array[SinkAbilitySpec])
4646
extends Sink(cluster, traitSet, inputRel, hints, targetColumns, contextResolvedTable, tableSink)
4747
with BatchPhysicalRel {
4848

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala

+7
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,18 @@ object SubplanReuser {
6363
val tableSourceReuseEnabled =
6464
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED)
6565

66+
val tableSinkReuseEnabled =
67+
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED)
68+
6669
var newRels = rels
6770
if (tableSourceReuseEnabled) {
6871
newRels = new ScanReuser(flinkContext, flinkTypeFactory).reuseDuplicatedScan(rels)
6972
}
7073

74+
if (tableSinkReuseEnabled && flinkContext.isBatchMode) {
75+
newRels = new SinkReuser().reuseDuplicatedSink(rels)
76+
}
77+
7178
val context = new SubplanReuseContext(tableSourceReuseEnabled, newRels: _*)
7279
val reuseShuttle = new SubplanReuseShuttle(context)
7380
newRels.map(_.accept(reuseShuttle))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.batch.sql;
20+
21+
import org.apache.flink.table.api.TableConfig;
22+
import org.apache.flink.table.planner.plan.common.SinkReuseTestBase;
23+
import org.apache.flink.table.planner.plan.reuse.SinkReuser;
24+
import org.apache.flink.table.planner.utils.TableTestUtil;
25+
26+
/** Tests for {@link SinkReuser} in batch mode. */
27+
public class BatchSinkReuseTest extends SinkReuseTestBase {
28+
@Override
29+
protected TableTestUtil getTableTestUtil(TableConfig tableConfig) {
30+
return batchTestUtil(tableConfig);
31+
}
32+
}

0 commit comments

Comments
 (0)