-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37598][table] Support list and map state in PTFs #26396
base: master
Are you sure you want to change the base?
Conversation
...ava/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
Show resolved
Hide resolved
...ava/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
Show resolved
Hide resolved
...nner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
Outdated
Show resolved
Hide resolved
...flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for addressing feedback
final RowData value = stateHandles[i].value(); | ||
stateToFunction[i] = value; | ||
final State stateHandle = stateHandles[i]; | ||
if (!(stateHandle instanceof ValueState)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the code would be cleaner with the if as positive and without the continue:
if ((stateHandle instanceof ValueState)) {
final ValueState<RowData> valueState = (ValueState<RowData>) stateHandle;
final RowData value = valueState.value();
valueStateToFunction[i] = value;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the early return principle here. I think in the end this is a matter of taste. Feel free to code it differently in your PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the early return principle here. I think in the end this is a matter of taste. Feel free to code it differently in your PRs.
ok no worries.
.getImplementationClass() | ||
.map(viewClass::isAssignableFrom) | ||
.orElse(false); | ||
if (!isDataView) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about rewriting the if and returns as
if (isDataView) {
viewType.getChildren().forEach(DataViewUtils::checkForInvalidDataViews);
}
return isDataView;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the early return principle here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok no worries
.../flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
Outdated
Show resolved
Hide resolved
...ime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessTableOperator.java
Show resolved
Hide resolved
...table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
Outdated
Show resolved
Hide resolved
valueState.clear(); | ||
} else { | ||
final HashFunction hashCode = stateHashCode[pos]; | ||
final RecordEqualiser equals = stateEquals[pos]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to use the equals() method on the functions to assert whether they are equal? If we are not in control as we cannot rely on the equals as they are user written, then I can see the need for a class like the Record Equaliser. I am curious on the use of RecordEqualiser
, equaliser implies to me we are looking to make the functions equal - where as we are likely just looking to compare - maybe a RecordComparator might be a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are comparing internal data structures here. i.e. RowData
. Which might be backed by memory segments. Also SQL semantics need to be considered when comparing nested data. This is why RecordEqualiser
and HashFunction
are code generated instances.
@twalthr thanks for addressing the feedback. |
@flinkbot run azure |
What is the purpose of the change
This adds support for list and map state in PTFs via
ListView
andMapView
.Brief change log
Verifying this change
This change added tests and can be verified as follows:
ProcessTableFunctionTestPrograms#PROCESS_LIST_STATE
ProcessTableFunctionTestPrograms#PROCESS_MAP_STATE
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation