Skip to content

Commit ae09a0d

Browse files
[FLINK-37407][state] Add savepoint metadata SQL built-in process function
1 parent 4db8c48 commit ae09a0d

File tree

11 files changed

+342
-4
lines changed

11 files changed

+342
-4
lines changed

docs/content/docs/libs/state_processor_api.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,28 @@ Before you interrogate state using the table API, make sure to review our [Flink
515515

516516
IMPORTANT NOTE: State Table API only supports keyed state.
517517

518+
### Metadata
519+
520+
The following SQL table function allows users to read the metadata of savepoints and checkpoints in the following way:
521+
```SQL
522+
LOAD MODULE state;
523+
SELECT * FROM savepoint_metadata('/root/dir/of/checkpoint-data/chk-1');
524+
```
525+
526+
The new table function creates a table with the following fixed schema:
527+
528+
| Key | Data type | Description |
529+
|------------------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
530+
| checkpoint-id | BIGINT NOT NULL | Checkpoint ID. |
531+
| operator-name | STRING | Operator Name. |
532+
| operator-uid | STRING | Operator UID. |
533+
| operator-uid-hash | STRING NOT NULL | Operator UID hash. |
534+
| operator-parallelism | INT NOT NULL | Parallelism of the operator. |
535+
| operator-max-parallelism | INT NOT NULL | Maximum parallelism of the operator. |
536+
| operator-subtask-state-count | INT NOT NULL | Number of operator subtask states. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned (for example broadcast source). |
537+
| operator-coordinator-state-size-in-bytes | BIGINT NOT NULL | The operator’s coordinator state size in bytes, or zero if no coordinator state. |
538+
| operator-total-size-in-bytes | BIGINT NOT NULL | Total operator state size in bytes. |
539+
518540
### Keyed State
519541

520542
[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), also known as partitioned state, is any state that is partitioned relative to a key.

flink-libraries/flink-state-processing-api/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ under the License.
5454
<scope>provided</scope>
5555
</dependency>
5656

57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-table-common</artifactId>
60+
<version>${project.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
5764
<dependency>
5865
<groupId>org.apache.flink</groupId>
5966
<artifactId>flink-table-runtime</artifactId>

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import static org.apache.flink.configuration.description.TextElement.code;
2828

29-
/** Options for the state connector. */
29+
/** Options for the savepoint connector. */
3030
@PublicEvolving
3131
public class SavepointConnectorOptions {
3232

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
import java.util.List;
4545

46-
/** State data stream scan provider. */
46+
/** Savepoint data stream scan provider. */
4747
public class SavepointDataStreamScanProvider implements DataStreamScanProvider {
4848
@Nullable private final String stateBackendType;
4949
private final String statePath;

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public DynamicTableSource copy() {
6666

6767
@Override
6868
public String asSummaryString() {
69-
return "State Table Source";
69+
return "Savepoint Table Source";
7070
}
7171

7272
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.runtime.checkpoint.OperatorState;
23+
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
24+
import org.apache.flink.state.api.runtime.SavepointLoader;
25+
import org.apache.flink.table.annotation.DataTypeHint;
26+
import org.apache.flink.table.annotation.FunctionHint;
27+
import org.apache.flink.table.functions.SpecializedFunction;
28+
import org.apache.flink.table.functions.TableFunction;
29+
import org.apache.flink.types.Row;
30+
31+
@Internal
32+
@FunctionHint(
33+
output =
34+
@DataTypeHint(
35+
"ROW<checkpoint-id BIGINT NOT NULL, "
36+
+ "operator-name STRING, "
37+
+ "operator-uid STRING, operator-uid-hash STRING NOT NULL, "
38+
+ "operator-parallelism INT NOT NULL, "
39+
+ "operator-max-parallelism INT NOT NULL, "
40+
+ "operator-subtask-state-count INT NOT NULL, "
41+
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
42+
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
43+
public class SavepointMetadataTableFunction extends TableFunction<Row> {
44+
public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {}
45+
46+
public void eval(String savepointPath) {
47+
try {
48+
CheckpointMetadata checkpointMetadata =
49+
SavepointLoader.loadSavepointMetadata(savepointPath);
50+
51+
for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
52+
Row row = Row.withNames();
53+
row.setField("checkpoint-id", checkpointMetadata.getCheckpointId());
54+
row.setField("operator-name", operatorState.getOperatorName().orElse(null));
55+
row.setField("operator-uid", operatorState.getOperatorUid().orElse(null));
56+
row.setField("operator-uid-hash", operatorState.getOperatorID().toHexString());
57+
row.setField("operator-parallelism", operatorState.getParallelism());
58+
row.setField("operator-max-parallelism", operatorState.getMaxParallelism());
59+
row.setField("operator-subtask-state-count", operatorState.getStates().size());
60+
row.setField(
61+
"operator-coordinator-state-size-in-bytes",
62+
operatorState.getCoordinatorState() != null
63+
? operatorState.getCoordinatorState().getStateSize()
64+
: 0L);
65+
row.setField("operator-total-size-in-bytes", operatorState.getCheckpointedSize());
66+
collect(row);
67+
}
68+
} catch (Exception e) {
69+
throw new RuntimeException(e);
70+
}
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.state.table.SavepointMetadataTableFunction;
23+
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
25+
import org.apache.flink.table.functions.FunctionDefinition;
26+
import org.apache.flink.table.module.Module;
27+
import org.apache.flink.table.types.inference.TypeStrategies;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Locale;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.Set;
35+
import java.util.function.Function;
36+
import java.util.stream.Collectors;
37+
38+
import static org.apache.flink.table.functions.FunctionKind.TABLE;
39+
40+
/** Module of state in Flink. */
41+
@Experimental
42+
public class StateModule implements Module {
43+
44+
public static final String IDENTIFIER = "state";
45+
46+
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
47+
BuiltInFunctionDefinition.newBuilder()
48+
.name("savepoint_metadata")
49+
.kind(TABLE)
50+
.runtimeClass(SavepointMetadataTableFunction.class.getName())
51+
.outputTypeStrategy(
52+
TypeStrategies.explicit(
53+
DataTypes.ROW(
54+
DataTypes.FIELD(
55+
"checkpoint-id", DataTypes.BIGINT().notNull()),
56+
DataTypes.FIELD("operator-name", DataTypes.STRING()),
57+
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
58+
DataTypes.FIELD(
59+
"operator-uid-hash",
60+
DataTypes.STRING().notNull()),
61+
DataTypes.FIELD(
62+
"operator-parallelism",
63+
DataTypes.INT().notNull()),
64+
DataTypes.FIELD(
65+
"operator-max-parallelism",
66+
DataTypes.INT().notNull()),
67+
DataTypes.FIELD(
68+
"operator-subtask-state-count",
69+
DataTypes.INT().notNull()),
70+
DataTypes.FIELD(
71+
"operator-coordinator-state-size-in-bytes",
72+
DataTypes.BIGINT().notNull()),
73+
DataTypes.FIELD(
74+
"operator-total-size-in-bytes",
75+
DataTypes.BIGINT().notNull()))))
76+
.build();
77+
78+
public static final StateModule INSTANCE = new StateModule();
79+
80+
private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
81+
private final Set<String> functionNamesWithInternal;
82+
private final Set<String> functionNamesWithoutInternal;
83+
84+
private StateModule() {
85+
final List<BuiltInFunctionDefinition> definitions =
86+
Collections.singletonList(SAVEPOINT_METADATA);
87+
this.normalizedFunctions =
88+
definitions.stream()
89+
.collect(
90+
Collectors.toMap(
91+
f -> f.getName().toUpperCase(Locale.ROOT),
92+
Function.identity()));
93+
this.functionNamesWithInternal =
94+
definitions.stream()
95+
.map(BuiltInFunctionDefinition::getName)
96+
.collect(Collectors.toSet());
97+
this.functionNamesWithoutInternal =
98+
definitions.stream()
99+
.filter(f -> !f.isInternal())
100+
.map(BuiltInFunctionDefinition::getName)
101+
.collect(Collectors.toSet());
102+
}
103+
104+
@Override
105+
public Set<String> listFunctions() {
106+
return listFunctions(false);
107+
}
108+
109+
@Override
110+
public Set<String> listFunctions(boolean includeHiddenFunctions) {
111+
if (includeHiddenFunctions) {
112+
return functionNamesWithInternal;
113+
} else {
114+
return functionNamesWithoutInternal;
115+
}
116+
}
117+
118+
@Override
119+
public Optional<FunctionDefinition> getFunctionDefinition(String name) {
120+
final String normalizedName = name.toUpperCase(Locale.ROOT);
121+
return Optional.ofNullable(normalizedFunctions.get(normalizedName));
122+
}
123+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.ConfigOption;
23+
import org.apache.flink.table.factories.ModuleFactory;
24+
import org.apache.flink.table.module.Module;
25+
26+
import java.util.Collections;
27+
import java.util.Set;
28+
29+
/** Factory for {@link StateModule}. */
30+
@Internal
31+
public class StateModuleFactory implements ModuleFactory {
32+
33+
@Override
34+
public String factoryIdentifier() {
35+
return StateModule.IDENTIFIER;
36+
}
37+
38+
@Override
39+
public Set<ConfigOption<?>> requiredOptions() {
40+
return Collections.emptySet();
41+
}
42+
43+
@Override
44+
public Set<ConfigOption<?>> optionalOptions() {
45+
return Collections.emptySet();
46+
}
47+
48+
@Override
49+
public Module createModule(Context context) {
50+
return StateModule.INSTANCE;
51+
}
52+
}

flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
org.apache.flink.state.table.module.StateModuleFactory
1617
org.apache.flink.state.table.SavepointDynamicTableSourceFactory

flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
3838
import static org.assertj.core.api.Assertions.assertThat;
3939

40-
/** Unit tests for the state SQL reader. */
40+
/** Unit tests for the savepoint SQL reader. */
4141
public class SavepointDynamicTableSourceTest {
4242
@Test
4343
@SuppressWarnings("unchecked")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table;
20+
21+
import org.apache.flink.api.common.RuntimeExecutionMode;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
import org.apache.flink.table.api.Table;
25+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
26+
import org.apache.flink.types.Row;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Comparator;
31+
import java.util.Iterator;
32+
import java.util.List;
33+
34+
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
/** Unit tests for the savepoint metadata SQL reader. */
38+
public class SavepointMetadataDynamicTableSourceTest {
39+
@Test
40+
public void testReadMetadata() throws Exception {
41+
Configuration config = new Configuration();
42+
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
43+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
44+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
45+
46+
tEnv.executeSql("LOAD MODULE state");
47+
Table table =
48+
tEnv.sqlQuery("SELECT * FROM savepoint_metadata('src/test/resources/table-state')");
49+
List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
50+
result.sort(Comparator.comparing(a -> ((String) a.getField("operator-uid-hash"))));
51+
52+
assertThat(result.size()).isEqualTo(7);
53+
Iterator<Row> it = result.iterator();
54+
assertThat(it.next().toString())
55+
.isEqualTo(
56+
"+I[2, Source: broadcast-source, broadcast-source-uid, 3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]");
57+
assertThat(it.next().toString())
58+
.isEqualTo(
59+
"+I[2, keyed-broadcast-process, keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 4548]");
60+
}
61+
}

0 commit comments

Comments
 (0)