Skip to content

Commit b892cb6

Browse files
committed
[FLINK-37540][table-planner] Introduce SupportsTargetColumnWriting sink ability
1 parent 8c9d4ee commit b892cb6

File tree

6 files changed

+213
-8
lines changed

6 files changed

+213
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.connector.sink.abilities;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.connector.sink.DynamicTableSink;
23+
24+
/**
25+
* Interface for {@link DynamicTableSink}s that support target column writing.
26+
*
27+
* <p>The planner will parse target columns from the DML clause and call {@link
28+
* #applyTargetColumns(int[][])} to pass an array of column index paths to sink.
29+
*
30+
* <p>The array indices are 0-based and support composite columns within (possibly nested)
31+
* structures. This information comes from the column list of the DML clause, e.g., for a sink table
32+
* t1 which schema is: {@code a STRING, b ROW < b1 INT, b2 STRING>, c BIGINT}
33+
*
34+
* <ul>
35+
* <li>insert: 'insert into t1(a, b.b2) ...', the column list will be 'a, b.b2', and will provide
36+
* {@code [[0], [1, 1]]}. The statement 'insert into t1 select ...' will not apply this
37+
* ability.
38+
* <li>update: 'update t1 set a=1, b.b1=2 where ...', the column list will be 'a, b.b1', and will
39+
* provide {@code [[0], [1, 0]]}.
40+
* </ul>
41+
*
42+
* <p>Note: Planner will not apply this ability for the delete statement because it has no column
43+
* list.
44+
*
45+
* <p>A sink can use this information to perform target columns writing.
46+
*
47+
* <p>If this interface is implemented and {@link #applyTargetColumns(int[][])} returns true. The
48+
* planner will use this information for plan optimization such as sink reuse.
49+
*/
50+
@PublicEvolving
51+
public interface SupportsTargetColumnWriting {
52+
53+
/**
54+
* Provides an array of column index paths related to user specified target column list.
55+
*
56+
* <p>See the documentation of {@link SupportsTargetColumnWriting} for more information.
57+
*
58+
* @param targetColumns column index paths
59+
* @return true if the target columns are applied successfully, false otherwise.
60+
*/
61+
boolean applyTargetColumns(int[][] targetColumns);
62+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
4949
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
5050
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
51+
import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
5152
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
5253
import org.apache.flink.table.connector.source.DynamicTableSource;
5354
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
@@ -63,6 +64,7 @@
6364
import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
6465
import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
6566
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
67+
import org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
6668
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
6769
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
6870
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
@@ -279,7 +281,8 @@ private static RelNode convertSinkToRel(
279281
isOverwrite,
280282
sink,
281283
contextResolvedTable.getResolvedTable(),
282-
sinkAbilitySpecs);
284+
sinkAbilitySpecs,
285+
targetColumns);
283286

284287
// rewrite rel node for delete
285288
if (isDelete) {
@@ -995,7 +998,8 @@ private static void prepareDynamicSink(
995998
boolean isOverwrite,
996999
DynamicTableSink sink,
9971000
ResolvedCatalogTable table,
998-
List<SinkAbilitySpec> sinkAbilitySpecs) {
1001+
List<SinkAbilitySpec> sinkAbilitySpecs,
1002+
int[][] targetColumns) {
9991003
table.getDistribution()
10001004
.ifPresent(
10011005
distribution ->
@@ -1007,6 +1011,8 @@ private static void prepareDynamicSink(
10071011
validateAndApplyOverwrite(tableDebugName, isOverwrite, sink, sinkAbilitySpecs);
10081012

10091013
validateAndApplyMetadata(tableDebugName, sink, table.getResolvedSchema(), sinkAbilitySpecs);
1014+
1015+
validateAndApplyTargetColumns(sink, targetColumns, sinkAbilitySpecs);
10101016
}
10111017

10121018
/**
@@ -1285,6 +1291,20 @@ private static void validateAndApplyMetadata(
12851291
createConsumedType(schema, sink)));
12861292
}
12871293

1294+
private static void validateAndApplyTargetColumns(
1295+
DynamicTableSink sink, int[][] targetColumns, List<SinkAbilitySpec> sinkAbilitySpecs) {
1296+
if (targetColumns == null || targetColumns.length == 0) {
1297+
return;
1298+
}
1299+
1300+
if (!(sink instanceof SupportsTargetColumnWriting)) {
1301+
// Ignore target columns if the sink doesn't support it.
1302+
return;
1303+
}
1304+
1305+
sinkAbilitySpecs.add(new TargetColumnWritingSpec(targetColumns));
1306+
}
1307+
12881308
/**
12891309
* Returns the {@link DataType} that a sink should consume as the output from the runtime.
12901310
*

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
@JsonSubTypes.Type(value = PartitioningSpec.class),
3838
@JsonSubTypes.Type(value = WritingMetadataSpec.class),
3939
@JsonSubTypes.Type(value = RowLevelDeleteSpec.class),
40-
@JsonSubTypes.Type(value = RowLevelUpdateSpec.class)
40+
@JsonSubTypes.Type(value = RowLevelUpdateSpec.class),
41+
@JsonSubTypes.Type(value = TargetColumnWritingSpec.class)
4142
})
4243
@Internal
4344
public interface SinkAbilitySpec {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.abilities.sink;
20+
21+
import org.apache.flink.table.api.TableException;
22+
import org.apache.flink.table.connector.sink.DynamicTableSink;
23+
import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
24+
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
29+
30+
import java.util.Arrays;
31+
import java.util.Objects;
32+
33+
/**
34+
* A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the writing target
35+
* column indices to/from JSON, but also can write the target columns for {@link
36+
* SupportsTargetColumnWriting}.
37+
*/
38+
@JsonIgnoreProperties(ignoreUnknown = true)
39+
@JsonTypeName("TargetColumnWriting")
40+
public class TargetColumnWritingSpec implements SinkAbilitySpec {
41+
public static final String FIELD_NAME_TARGET_COLUMNS = "targetColumns";
42+
43+
@JsonProperty(FIELD_NAME_TARGET_COLUMNS)
44+
private final int[][] targetColumns;
45+
46+
@JsonCreator
47+
public TargetColumnWritingSpec(@JsonProperty(FIELD_NAME_TARGET_COLUMNS) int[][] targetColumns) {
48+
this.targetColumns = targetColumns;
49+
}
50+
51+
@Override
52+
public void apply(DynamicTableSink tableSink) {
53+
if (tableSink instanceof SupportsTargetColumnWriting) {
54+
((SupportsTargetColumnWriting) tableSink).applyTargetColumns(targetColumns);
55+
} else {
56+
throw new TableException(
57+
String.format(
58+
"%s does not support SupportsTargetColumnWriting.",
59+
tableSink.getClass().getName()));
60+
}
61+
}
62+
63+
@Override
64+
public boolean equals(Object o) {
65+
if (this == o) {
66+
return true;
67+
}
68+
if (o == null || getClass() != o.getClass()) {
69+
return false;
70+
}
71+
TargetColumnWritingSpec that = (TargetColumnWritingSpec) o;
72+
return Objects.deepEquals(targetColumns, that.targetColumns);
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
return Arrays.deepHashCode(targetColumns);
78+
}
79+
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.flink.table.connector.sink.abilities.SupportsBucketing;
5454
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
5555
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
56+
import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
5657
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
5758
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
5859
import org.apache.flink.table.connector.source.DataStreamScanProvider;
@@ -786,7 +787,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
786787
rowTimeIndex,
787788
tableSchema,
788789
requireBucketCount,
789-
supportsDeleteByKey);
790+
supportsDeleteByKey,
791+
null);
790792
} else {
791793
try {
792794
return InstantiationUtil.instantiate(
@@ -2221,9 +2223,11 @@ private static class TestValuesTableSink
22212223
SupportsWritingMetadata,
22222224
SupportsPartitioning,
22232225
SupportsOverwrite,
2224-
SupportsBucketing {
2226+
SupportsBucketing,
2227+
SupportsTargetColumnWriting {
22252228

22262229
private DataType consumedDataType;
2230+
private int[][] targetColumns;
22272231
private int[] primaryKeyIndices;
22282232
private final String tableName;
22292233
private final boolean isInsertOnly;
@@ -2250,7 +2254,8 @@ private TestValuesTableSink(
22502254
int rowtimeIndex,
22512255
TableSchema tableSchema,
22522256
boolean requireBucketCount,
2253-
boolean supportsDeleteByKey) {
2257+
boolean supportsDeleteByKey,
2258+
int[][] targetColumns) {
22542259
this.consumedDataType = consumedDataType;
22552260
this.primaryKeyIndices = primaryKeyIndices;
22562261
this.tableName = tableName;
@@ -2264,6 +2269,7 @@ private TestValuesTableSink(
22642269
this.tableSchema = tableSchema;
22652270
this.requireBucketCount = requireBucketCount;
22662271
this.supportsDeleteByKey = supportsDeleteByKey;
2272+
this.targetColumns = targetColumns;
22672273
}
22682274

22692275
@Override
@@ -2416,7 +2422,8 @@ public DynamicTableSink copy() {
24162422
rowtimeIndex,
24172423
tableSchema,
24182424
requireBucketCount,
2419-
supportsDeleteByKey);
2425+
supportsDeleteByKey,
2426+
targetColumns);
24202427
}
24212428

24222429
@Override
@@ -2454,6 +2461,12 @@ public Set<TableDistribution.Kind> listAlgorithms() {
24542461
public boolean requiresBucketCount() {
24552462
return requireBucketCount;
24562463
}
2464+
2465+
@Override
2466+
public boolean applyTargetColumns(int[][] targetColumns) {
2467+
this.targetColumns = targetColumns;
2468+
return true;
2469+
}
24572470
}
24582471

24592472
/** A TableSink used for testing the implementation of {@link SinkFunction.Context}. */

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
3939
import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
4040
import org.apache.flink.table.planner.plan.abilities.sink.PartitioningSpec;
41+
import org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
4142
import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
4243
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
4344
import org.apache.flink.table.planner.utils.PlannerMocks;
@@ -173,7 +174,36 @@ static Stream<DynamicTableSinkSpec> testDynamicTableSinkSpecSerde() {
173174
RowType.of(new BigIntType(), new IntType()))),
174175
null);
175176

176-
return Stream.of(spec1, spec2, spec3);
177+
Map<String, String> options4 = new HashMap<>();
178+
options4.put("connector", TestValuesTableFactory.IDENTIFIER);
179+
int[][] targetColumnIndices = new int[][] {{0}, {1}};
180+
181+
final ResolvedSchema resolvedSchema4 =
182+
new ResolvedSchema(
183+
Arrays.asList(
184+
Column.physical("a", DataTypes.BIGINT()),
185+
Column.physical("b", DataTypes.INT()),
186+
Column.metadata("p", DataTypes.STRING(), null, false)),
187+
Collections.emptyList(),
188+
null);
189+
final CatalogTable catalogTable4 =
190+
CatalogTable.newBuilder()
191+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema4).build())
192+
.options(options4)
193+
.build();
194+
195+
DynamicTableSinkSpec spec4 =
196+
new DynamicTableSinkSpec(
197+
ContextResolvedTable.temporary(
198+
ObjectIdentifier.of(
199+
CatalogManagerMocks.DEFAULT_CATALOG,
200+
CatalogManagerMocks.DEFAULT_DATABASE,
201+
"MyTable"),
202+
new ResolvedCatalogTable(catalogTable4, resolvedSchema4)),
203+
Collections.singletonList(new TargetColumnWritingSpec(targetColumnIndices)),
204+
targetColumnIndices);
205+
206+
return Stream.of(spec1, spec2, spec3, spec4);
177207
}
178208

179209
@ParameterizedTest

0 commit comments

Comments
 (0)