Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37407][state] Add savepoint metadata SQL built-in process function #26393

Merged
merged 1 commit into from
Apr 7, 2025

Conversation

gaborgsomogyi
Copy link
Contributor

What is the purpose of the change

Savepoint metadata is useful information from user perspective but it's not yet available from SQL perspective. In this PR I've added support for this, please see FLIP512 for further details.

I've also added some tiny name rationalization in comments.

Brief change log

Added savepoint metadata SQL built-in process function.

Verifying this change

New automated test.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

| operator-uid-hash | STRING NOT NULL | Operator UID hash. |
| operator-parallelism | INT NOT NULL | Parallelism of the operator. |
| operator-max-parallelism | INT NOT NULL | Maximum parallelism of the operator. |
| operator-subtask-state-count | INT NOT NULL | Number of operator subtask states. |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operator-subtask-state-count is an addition compared to the FLIP. I thought it's valuable information but if somebody has objection we can remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a good addition to me

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 3, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -515,6 +515,28 @@ Before you interrogate state using the table API, make sure to review our [Flink

IMPORTANT NOTE: State Table API only supports keyed state.

### Metadata

The SQL connector allows users to read metadata of savepoints in the following way:
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
read metadata -> read the metadata
This is not a SQL connector - it is a new table function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

The SQL connector allows users to read metadata of savepoints in the following way:
```SQL
LOAD MODULE state;
SELECT * FROM savepoint_metadata('/root/dir/of/checkpoint-data/chk-1');
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flip talks of checkpoints and savepoint state, can we use this funciton to access checkpoint state as well? Currently the function is savepoint_metadata; if it covers both can we function change the name so it applies to both savepoints and checkpoints - maybe state_table_metadata?

If checkpoints are not included - how does it error if we point this function at the checkpoint file? Maybe a unit test for that would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flip talks of checkpoints and savepoint state, can we use this funciton to access checkpoint state as well?

Yes. We can't name things in both ways. Initially we've agreed to call the whole state processor API things as savepoint... and the only exception here is the module because it's a reserved keyword in Flink SQL. If we go on this road, then the whole state processor API including internals must be renamed which I don't support.

SELECT * FROM savepoint_metadata('/root/dir/of/checkpoint-data/chk-1');
```

The table have the following fixed schema:
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
The table have the following fixed schema:
->
The new table function creates a table with the following fixed schema:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@@ -26,7 +26,7 @@

import static org.apache.flink.configuration.description.TextElement.code;

/** Options for the state connector. */
/** Options for the savepoint connector. */
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not really a connector as far as I can see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to the meta feature, just renamed everything to savepoint as we've agreed on. This is used by the connector...

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

@Internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this internal -as it is driven directly from SQL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar classes are having such annotation.


import static org.apache.flink.table.functions.FunctionKind.TABLE;

/** Module of state in Flink. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mention savepoint if this is only savepoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only. As mentioned before everything is called savepoint except the module because that's a reserved keyword.


The SQL connector allows users to read metadata of savepoints in the following way:
```SQL
LOAD MODULE state;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if this is savepoints only ; should we call it savepoint-state. Currently the implementation seems to be tied to a fixed savepoint schema. savepoint-state would be more descriptive and better scoped - allowing us to provide a more generic state module later - if we want to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like all the rest of the questions related is savepoint only?. My answer to all of them is no. We've agreed on this naming in the discussion.

@Experimental
public class StateModule implements Module {

public static final String IDENTIFIER = "state";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the identified should mention the word savepoint maybe "savepoint-state"


/** Module of state in Flink. */
@Experimental
public class StateModule implements Module {
Copy link
Contributor

@davidradl davidradl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we rename SavepointStateModule so it is more descriptive
I wonder if we should refactor CoreModule so we can extend, as the only difference seems to be the List<BuiltInFunctionDefinition> definitions

@gaborgsomogyi
Copy link
Contributor Author

cc @Zakelly

Comment on lines +57 to +59
row.setField("operator-parallelism", operatorState.getParallelism());
row.setField("operator-max-parallelism", operatorState.getMaxParallelism());
row.setField("operator-subtask-state-count", operatorState.getStates().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, is operator-parallelism and operator-subtask-state-count providing the same value? It seems so.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for example broadcast source has operator-parallelism = 1 but operator-subtask-state-count = 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus I suggest adding a more detailed description for operator-subtask-state-count in the doc. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned. That's my understanding, correct me if I'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a hidden gem and worth to add some more info. Your understanding is correct so I would add your sentence as-is.

private final Set<String> functionNamesWithInternal;
private final Set<String> functionNamesWithoutInternal;

public StateModule() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make constructor private since we are using singleton

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. LGTM, only one minor suggestion

Comment on lines +57 to +59
row.setField("operator-parallelism", operatorState.getParallelism());
row.setField("operator-max-parallelism", operatorState.getMaxParallelism());
row.setField("operator-subtask-state-count", operatorState.getStates().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus I suggest adding a more detailed description for operator-subtask-state-count in the doc. It represents the state partition count divided by the operator's parallelism and might be 0 if the state is not partitioned. That's my understanding, correct me if I'm wrong.

@gaborgsomogyi gaborgsomogyi merged commit ae09a0d into apache:master Apr 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants