Skip to content

[FLINK-37407][state] Add savepoint metadata SQL built-in process function #26393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/content/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,28 @@ Before you interrogate state using the table API, make sure to review our [Flink

IMPORTANT NOTE: State Table API only supports keyed state.

### Metadata

The following SQL table function allows users to read the metadata of savepoints and checkpoints in the following way:
```SQL
LOAD MODULE state;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this is savepoints only ; should we call it savepoint-state. Currently the implementation seems to be tied to a fixed savepoint schema. savepoint-state would be more descriptive and better scoped - allowing us to provide a more generic state module later - if we want to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like all the rest of the questions related is savepoint only?. My answer to all of them is no. We've agreed on this naming in the discussion.

SELECT * FROM savepoint_metadata('/root/dir/of/checkpoint-data/chk-1');
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flip talks of checkpoints and savepoint state, can we use this funciton to access checkpoint state as well? Currently the function is savepoint_metadata; if it covers both can we function change the name so it applies to both savepoints and checkpoints - maybe state_table_metadata?

If checkpoints are not included - how does it error if we point this function at the checkpoint file? Maybe a unit test for that would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flip talks of checkpoints and savepoint state, can we use this funciton to access checkpoint state as well?

Yes. We can't name things in both ways. Initially we've agreed to call the whole state processor API things as savepoint... and the only exception here is the module because it's a reserved keyword in Flink SQL. If we go on this road, then the whole state processor API including internals must be renamed which I don't support.

```

The new table function creates a table with the following fixed schema:

| Key | Data type | Description |
|------------------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| checkpoint-id | BIGINT NOT NULL | Checkpoint ID. |
| operator-name | STRING | Operator Name. |
| operator-uid | STRING | Operator UID. |
| operator-uid-hash | STRING NOT NULL | Operator UID hash. |
| operator-parallelism | INT NOT NULL | Parallelism of the operator. |
| operator-max-parallelism | INT NOT NULL | Maximum parallelism of the operator. |
| operator-subtask-state-count | INT NOT NULL | Number of operator subtask states. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned (for example broadcast source). |
| operator-coordinator-state-size-in-bytes | BIGINT NOT NULL | The operator’s coordinator state size in bytes, or zero if no coordinator state. |
| operator-total-size-in-bytes | BIGINT NOT NULL | Total operator state size in bytes. |

### Keyed State

[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), also known as partitioned state, is any state that is partitioned relative to a key.
Expand Down
7 changes: 7 additions & 0 deletions flink-libraries/flink-state-processing-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static org.apache.flink.configuration.description.TextElement.code;

/** Options for the state connector. */
/** Options for the savepoint connector. */
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not really a connector as far as I can see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to the meta feature, just renamed everything to savepoint as we've agreed on. This is used by the connector...

@PublicEvolving
public class SavepointConnectorOptions {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

import java.util.List;

/** State data stream scan provider. */
/** Savepoint data stream scan provider. */
public class SavepointDataStreamScanProvider implements DataStreamScanProvider {
@Nullable private final String stateBackendType;
private final String statePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public DynamicTableSource copy() {

@Override
public String asSummaryString() {
return "State Table Source";
return "Savepoint Table Source";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

@Internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this internal -as it is driven directly from SQL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar classes are having such annotation.

@FunctionHint(
output =
@DataTypeHint(
"ROW<checkpoint-id BIGINT NOT NULL, "
+ "operator-name STRING, "
+ "operator-uid STRING, operator-uid-hash STRING NOT NULL, "
+ "operator-parallelism INT NOT NULL, "
+ "operator-max-parallelism INT NOT NULL, "
+ "operator-subtask-state-count INT NOT NULL, "
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
public class SavepointMetadataTableFunction extends TableFunction<Row> {
public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {}

public void eval(String savepointPath) {
try {
CheckpointMetadata checkpointMetadata =
SavepointLoader.loadSavepointMetadata(savepointPath);

for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
Row row = Row.withNames();
row.setField("checkpoint-id", checkpointMetadata.getCheckpointId());
row.setField("operator-name", operatorState.getOperatorName().orElse(null));
row.setField("operator-uid", operatorState.getOperatorUid().orElse(null));
row.setField("operator-uid-hash", operatorState.getOperatorID().toHexString());
row.setField("operator-parallelism", operatorState.getParallelism());
row.setField("operator-max-parallelism", operatorState.getMaxParallelism());
row.setField("operator-subtask-state-count", operatorState.getStates().size());
Comment on lines +57 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, is operator-parallelism and operator-subtask-state-count providing the same value? It seems so.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for example broadcast source has operator-parallelism = 1 but operator-subtask-state-count = 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus I suggest adding a more detailed description for operator-subtask-state-count in the doc. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned. That's my understanding, correct me if I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a hidden gem and worth to add some more info. Your understanding is correct so I would add your sentence as-is.

row.setField(
"operator-coordinator-state-size-in-bytes",
operatorState.getCoordinatorState() != null
? operatorState.getCoordinatorState().getStateSize()
: 0L);
row.setField("operator-total-size-in-bytes", operatorState.getCheckpointedSize());
collect(row);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table.module;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.state.table.SavepointMetadataTableFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.types.inference.TypeStrategies;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.functions.FunctionKind.TABLE;

/** Module of state in Flink. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention savepoint if this is only savepoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only. As mentioned before everything is called savepoint except the module because that's a reserved keyword.

@Experimental
public class StateModule implements Module {
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we rename SavepointStateModule so it is more descriptive
I wonder if we should refactor CoreModule so we can extend, as the only difference seems to be the List<BuiltInFunctionDefinition> definitions


public static final String IDENTIFIER = "state";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the identified should mention the word savepoint maybe "savepoint-state"


public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
BuiltInFunctionDefinition.newBuilder()
.name("savepoint_metadata")
.kind(TABLE)
.runtimeClass(SavepointMetadataTableFunction.class.getName())
.outputTypeStrategy(
TypeStrategies.explicit(
DataTypes.ROW(
DataTypes.FIELD(
"checkpoint-id", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("operator-name", DataTypes.STRING()),
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
DataTypes.FIELD(
"operator-uid-hash",
DataTypes.STRING().notNull()),
DataTypes.FIELD(
"operator-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-max-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-subtask-state-count",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-coordinator-state-size-in-bytes",
DataTypes.BIGINT().notNull()),
DataTypes.FIELD(
"operator-total-size-in-bytes",
DataTypes.BIGINT().notNull()))))
.build();

public static final StateModule INSTANCE = new StateModule();

private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
private final Set<String> functionNamesWithInternal;
private final Set<String> functionNamesWithoutInternal;

private StateModule() {
final List<BuiltInFunctionDefinition> definitions =
Collections.singletonList(SAVEPOINT_METADATA);
this.normalizedFunctions =
definitions.stream()
.collect(
Collectors.toMap(
f -> f.getName().toUpperCase(Locale.ROOT),
Function.identity()));
this.functionNamesWithInternal =
definitions.stream()
.map(BuiltInFunctionDefinition::getName)
.collect(Collectors.toSet());
this.functionNamesWithoutInternal =
definitions.stream()
.filter(f -> !f.isInternal())
.map(BuiltInFunctionDefinition::getName)
.collect(Collectors.toSet());
}

@Override
public Set<String> listFunctions() {
return listFunctions(false);
}

@Override
public Set<String> listFunctions(boolean includeHiddenFunctions) {
if (includeHiddenFunctions) {
return functionNamesWithInternal;
} else {
return functionNamesWithoutInternal;
}
}

@Override
public Optional<FunctionDefinition> getFunctionDefinition(String name) {
final String normalizedName = name.toUpperCase(Locale.ROOT);
return Optional.ofNullable(normalizedFunctions.get(normalizedName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table.module;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.module.Module;

import java.util.Collections;
import java.util.Set;

/** Factory for {@link StateModule}. */
@Internal
public class StateModuleFactory implements ModuleFactory {

@Override
public String factoryIdentifier() {
return StateModule.IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}

@Override
public Module createModule(Context context) {
return StateModule.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.state.table.module.StateModuleFactory
org.apache.flink.state.table.SavepointDynamicTableSourceFactory
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for the state SQL reader. */
/** Unit tests for the savepoint SQL reader. */
public class SavepointDynamicTableSourceTest {
@Test
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import org.junit.jupiter.api.Test;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for the savepoint metadata SQL reader. */
public class SavepointMetadataDynamicTableSourceTest {
@Test
public void testReadMetadata() throws Exception {
Configuration config = new Configuration();
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql("LOAD MODULE state");
Table table =
tEnv.sqlQuery("SELECT * FROM savepoint_metadata('src/test/resources/table-state')");
List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
result.sort(Comparator.comparing(a -> ((String) a.getField("operator-uid-hash"))));

assertThat(result.size()).isEqualTo(7);
Iterator<Row> it = result.iterator();
assertThat(it.next().toString())
.isEqualTo(
"+I[2, Source: broadcast-source, broadcast-source-uid, 3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]");
assertThat(it.next().toString())
.isEqualTo(
"+I[2, keyed-broadcast-process, keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 4548]");
}
}