Skip to content

Commit e549d69

Browse files
committed
[FLINK-37598][table] Support list and map state in PTFs
1 parent 076fa80 commit e549d69

File tree

20 files changed

+606
-320
lines changed

20 files changed

+606
-320
lines changed

docs/content.zh/docs/dev/table/functions/ptfs.md

+74
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,80 @@ class CountingFunction extends ProcessTableFunction<String> {
708708
{{< /tab >}}
709709
{{< /tabs >}}
710710

711+
### Large State
712+
713+
Flink's state backends provide different types of state to efficiently handle large state.
714+
715+
Currently, PTFs support three types of state:
716+
717+
- **Value state**: Represents a single value.
718+
- **List state**: Represents a list of values, supporting operations like appending, removing, and iterating.
719+
- **Map state**: Represents a map (key-value pair) for efficient lookups, modifications, and removal of individual entries.
720+
721+
By default, state entries in a PTF are represented as value state. This means that every state entry is fully read from
722+
the state backend when the evaluation method is called, and the value is written back to the state backend once the
723+
evaluation method finishes.
724+
725+
To optimize state access and avoid unnecessary (de)serialization, state entries can be declared as:
726+
- `org.apache.flink.table.api.dataview.ListView` (for list state)
727+
- `org.apache.flink.table.api.dataview.MapView` (for map state)
728+
729+
These provide direct views to the underlying Flink state backend.
730+
731+
For example, when using a `MapView`, accessing a value via `MapView#get` will only deserialize the value associated with
732+
the specified key. This allows for efficient access to individual entries without needing to load the entire map. This
733+
approach is particularly useful when the map does not fit entirely into memory.
734+
735+
{{< hint info >}}
736+
State TTL is applied individually to each entry in a list or map, allowing for fine-grained expiration control over state
737+
elements.
738+
{{< /hint >}}
739+
740+
{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
741+
{{< tab "Java" >}}
742+
```java
743+
// Function that uses a map view for storing a large map for an event history per user
744+
class LargeHistoryFunction extends ProcessTableFunction<String> {
745+
public void eval(
746+
@StateHint MapView<String, Integer> largeMemory,
747+
@ArgumentHint(TABLE_AS_SET) Row input
748+
) {
749+
String eventId = input.getFieldAs("eventId");
750+
Integer count = largeMemory.get(eventId);
751+
if (count == null) {
752+
largeMemory.put(eventId, 1);
753+
} else {
754+
if (count > 1000) {
755+
collect("Anomaly detected: " + eventId);
756+
}
757+
largeMemory.put(eventId, count + 1);
758+
}
759+
}
760+
}
761+
```
762+
{{< /tab >}}
763+
{{< /tabs >}}
764+
765+
Similar to other data types, reflection is used to extract the necessary type information. If reflection is not
766+
feasible - such as when a `Row` object is involved - type hints can be provided. Use the `ARRAY` data type for list views
767+
and the `MAP` data type for map views.
768+
769+
{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
770+
{{< tab "Java" >}}
771+
```java
772+
// Function that uses a list view of rows
773+
class LargeHistoryFunction extends ProcessTableFunction<String> {
774+
public void eval(
775+
@StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) ListView<Row> largeMemory,
776+
@ArgumentHint(TABLE_AS_SET) Row input
777+
) {
778+
...
779+
}
780+
}
781+
```
782+
{{< /tab >}}
783+
{{< /tabs >}}
784+
711785
### Efficiency and Design Principles
712786

713787
A stateful function also means that data layout and data retention should be well thought

docs/content/docs/dev/table/functions/ptfs.md

+74
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,80 @@ class CountingFunction extends ProcessTableFunction<String> {
708708
{{< /tab >}}
709709
{{< /tabs >}}
710710

711+
### Large State
712+
713+
Flink's state backends provide different types of state to efficiently handle large state.
714+
715+
Currently, PTFs support three types of state:
716+
717+
- **Value state**: Represents a single value.
718+
- **List state**: Represents a list of values, supporting operations like appending, removing, and iterating.
719+
- **Map state**: Represents a map (key-value pair) for efficient lookups, modifications, and removal of individual entries.
720+
721+
By default, state entries in a PTF are represented as value state. This means that every state entry is fully read from
722+
the state backend when the evaluation method is called, and the value is written back to the state backend once the
723+
evaluation method finishes.
724+
725+
To optimize state access and avoid unnecessary (de)serialization, state entries can be declared as:
726+
- `org.apache.flink.table.api.dataview.ListView` (for list state)
727+
- `org.apache.flink.table.api.dataview.MapView` (for map state)
728+
729+
These provide direct views to the underlying Flink state backend.
730+
731+
For example, when using a `MapView`, accessing a value via `MapView#get` will only deserialize the value associated with
732+
the specified key. This allows for efficient access to individual entries without needing to load the entire map. This
733+
approach is particularly useful when the map does not fit entirely into memory.
734+
735+
{{< hint info >}}
736+
State TTL is applied individually to each entry in a list or map, allowing for fine-grained expiration control over state
737+
elements.
738+
{{< /hint >}}
739+
740+
{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
741+
{{< tab "Java" >}}
742+
```java
743+
// Function that uses a map view for storing a large map for an event history per user
744+
class LargeHistoryFunction extends ProcessTableFunction<String> {
745+
public void eval(
746+
@StateHint MapView<String, Integer> largeMemory,
747+
@ArgumentHint(TABLE_AS_SET) Row input
748+
) {
749+
String eventId = input.getFieldAs("eventId");
750+
Integer count = largeMemory.get(eventId);
751+
if (count == null) {
752+
largeMemory.put(eventId, 1);
753+
} else {
754+
if (count > 1000) {
755+
collect("Anomaly detected: " + eventId);
756+
}
757+
largeMemory.put(eventId, count + 1);
758+
}
759+
}
760+
}
761+
```
762+
{{< /tab >}}
763+
{{< /tabs >}}
764+
765+
Similar to other data types, reflection is used to extract the necessary type information. If reflection is not
766+
feasible - such as when a `Row` object is involved - type hints can be provided. Use the `ARRAY` data type for list views
767+
and the `MAP` data type for map views.
768+
769+
{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
770+
{{< tab "Java" >}}
771+
```java
772+
// Function that uses a list view of rows
773+
class LargeHistoryFunction extends ProcessTableFunction<String> {
774+
public void eval(
775+
@StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) ListView<Row> largeMemory,
776+
@ArgumentHint(TABLE_AS_SET) Row input
777+
) {
778+
...
779+
}
780+
}
781+
```
782+
{{< /tab >}}
783+
{{< /tabs >}}
784+
711785
### Efficiency and Design Principles
712786

713787
A stateful function also means that data layout and data retention should be well thought

flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/DataView.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
package org.apache.flink.table.api.dataview;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22-
import org.apache.flink.table.functions.ImperativeAggregateFunction;
22+
import org.apache.flink.table.functions.ProcessTableFunction;
2323

2424
/**
25-
* A {@link DataView} is a collection type that can be used in the accumulator of an {@link
26-
* ImperativeAggregateFunction}.
25+
* A {@link DataView} is a collection type that can be used in the accumulator of aggregating
26+
* functions and as a state entry in {@link ProcessTableFunction}s.
2727
*
2828
* <p>Depending on the context in which the function is used, a {@link DataView} can be backed by a
2929
* Java heap collection or a state backend.

flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import java.util.Objects;
3232

3333
/**
34-
* A {@link DataView} that provides {@link List}-like functionality in the accumulator of an {@link
35-
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
34+
* A {@link DataView} that provides {@link List}-like functionality in state entries.
3635
*
3736
* <p>A {@link ListView} can be backed by a Java {@link ArrayList} or can leverage Flink's state
38-
* backends depending on the context in which the aggregate function is used. In many unbounded data
39-
* scenarios, the {@link ListView} delegates all calls to a {@link ListState} instead of the {@link
40-
* ArrayList}.
37+
* backends depending on the context. In many unbounded data scenarios, the {@link ListView}
38+
* delegates all calls to a {@link ListState} instead of the {@link ArrayList}.
39+
*
40+
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
41+
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
42+
*
43+
* <p>For process table functions, the view can be used as a top-level state entry.
4144
*
4245
* <p>Note: Elements of a {@link ListView} must not be null. For heap-based state backends, {@code
4346
* hashCode/equals} of the original (i.e. external) class are used. However, the serialization
@@ -57,15 +60,15 @@
5760
* public ListView<String> list = new ListView<>();
5861
*
5962
* // or explicit:
60-
* // {@literal @}DataTypeHint("ARRAY<STRING>")
63+
* // @DataTypeHint("ARRAY < STRING >")
6164
* // public ListView<String> list = new ListView<>();
6265
*
6366
* public long count = 0L;
6467
* }
6568
*
6669
* public class MyAggregateFunction extends AggregateFunction<String, MyAccumulator> {
6770
*
68-
* {@literal @}Override
71+
* @Override
6972
* public MyAccumulator createAccumulator() {
7073
* return new MyAccumulator();
7174
* }
@@ -75,7 +78,7 @@
7578
* accumulator.count++;
7679
* }
7780
*
78-
* {@literal @}Override
81+
* @Override
7982
* public String getValue(MyAccumulator accumulator) {
8083
* // return the count and the joined elements
8184
* return count + ": " + String.join("|", acc.list.get());
@@ -84,9 +87,6 @@
8487
*
8588
* }</pre>
8689
*
87-
* <p>{@code ListView(TypeInformation<?> elementType)} method was deprecated and then removed.
88-
* Please use a {@link DataTypeHint} instead.
89-
*
9090
* @param <T> element type
9191
*/
9292
@PublicEvolving
@@ -152,7 +152,7 @@ public boolean remove(T value) throws Exception {
152152
return list.remove(value);
153153
}
154154

155-
/** Removes all of the elements from this list view. */
155+
/** Removes all elements from this list view. */
156156
@Override
157157
public void clear() {
158158
list.clear();

flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
import java.util.Objects;
3333

3434
/**
35-
* A {@link DataView} that provides {@link Map}-like functionality in the accumulator of an {@link
36-
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
35+
* A {@link DataView} that provides {@link Map}-like functionality in state entries.
3736
*
3837
* <p>A {@link MapView} can be backed by a Java {@link HashMap} or can leverage Flink's state
39-
* backends depending on the context in which the aggregate function is used. In many unbounded data
40-
* scenarios, the {@link MapView} delegates all calls to a {@link MapState} instead of the {@link
41-
* HashMap}.
38+
* backends depending on the context. In many unbounded data scenarios, the {@link MapView}
39+
* delegates all calls to a {@link MapState} instead of the {@link HashMap}.
40+
*
41+
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
42+
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
43+
*
44+
* <p>For process table functions, the view can be used as a top-level state entry.
4245
*
4346
* <p>Note: Keys of a {@link MapView} must not be null. Nulls in values are supported. For
4447
* heap-based state backends, {@code hashCode/equals} of the original (i.e. external) class are
@@ -58,15 +61,15 @@
5861
* public MapView<String, Integer> map = new MapView<>();
5962
*
6063
* // or explicit:
61-
* // {@literal @}DataTypeHint("MAP<STRING, INT>")
64+
* // @DataTypeHint("MAP < STRING, INT >")
6265
* // public MapView<String, Integer> map = new MapView<>();
6366
*
6467
* public long count;
6568
* }
6669
*
6770
* public class MyAggregateFunction extends AggregateFunction<Long, MyAccumulator> {
6871
*
69-
* {@literal @}Override
72+
* @Override
7073
* public MyAccumulator createAccumulator() {
7174
* return new MyAccumulator();
7275
* }
@@ -78,17 +81,14 @@
7881
* }
7982
* }
8083
*
81-
* {@literal @}Override
84+
* @Override
8285
* public Long getValue(MyAccumulator accumulator) {
8386
* return accumulator.count;
8487
* }
8588
* }
8689
*
8790
* }</pre>
8891
*
89-
* <p>{@code MapView(TypeInformation<?> keyType, TypeInformation<?> valueType)} method was
90-
* deprecated and removed. Please use a {@link DataTypeHint} instead.
91-
*
9292
* @param <K> key type
9393
* @param <V> value type
9494
*/
@@ -119,7 +119,7 @@ public void setMap(Map<K, V> map) {
119119
/**
120120
* Return the value for the specified key or {@code null} if the key is not in the map view.
121121
*
122-
* @param key The look up key.
122+
* @param key The lookup key.
123123
* @return The value for the specified key.
124124
* @throws Exception Thrown if the system cannot get data.
125125
*/

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java

+50
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.flink.table.annotation.DataTypeHint;
2525
import org.apache.flink.table.annotation.FunctionHint;
2626
import org.apache.flink.table.annotation.StateHint;
27+
import org.apache.flink.table.api.dataview.ListView;
28+
import org.apache.flink.table.api.dataview.MapView;
2729
import org.apache.flink.table.catalog.DataTypeFactory;
2830
import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
2931
import org.apache.flink.table.types.inference.TypeInference;
@@ -290,6 +292,54 @@
290292
* }
291293
* }</pre>
292294
*
295+
* <h2>Large State</h2>
296+
*
297+
* <p>Flink's state backends provide different types of state to efficiently handle large state.
298+
*
299+
* <p>Currently, PTFs support three types of state:
300+
*
301+
* <ul>
302+
* <li><b>Value state</b>: Represents a single value.
303+
* <li><b>List state</b>: Represents a list of values, supporting operations like appending,
304+
* removing, and iterating.
305+
* <li><b>Map state</b>: Represents a map (key-value pair) for efficient lookups, modifications,
306+
* and removal of individual entries.
307+
* </ul>
308+
*
309+
* <p>By default, state entries in a PTF are represented as value state. This means that every state
310+
* entry is fully read from the state backend when the evaluation method is called, and the value is
311+
* written back to the state backend once the evaluation method finishes.
312+
*
313+
* <p>To optimize state access and avoid unnecessary (de)serialization, state entries can be
314+
* declared as {@link ListView} or {@link MapView}. These provide direct views to the underlying
315+
* Flink state backend.
316+
*
317+
* <p>For example, when using a {@link MapView}, accessing a value via {@link MapView#get(Object)}
318+
* will only deserialize the value associated with the specified key. This allows for efficient
319+
* access to individual entries without needing to load the entire map. This approach is
320+
* particularly useful when the map does not fit entirely into memory.
321+
*
322+
* <p>State TTL is applied individually to each entry in a list or map, allowing for fine-grained
323+
* expiration control over state elements.
324+
*
325+
* <pre>{@code
326+
* // Function that uses a map view for storing a large map for an event history per user
327+
* class HistoryFunction extends ProcessTableFunction<String> {
328+
* public void eval(@StateHint MapView<String, Integer> largeMemory, @ArgumentHint(TABLE_AS_SET) Row input) {
329+
* String eventId = input.getFieldAs("eventId");
330+
* Integer count = largeMemory.get(eventId);
331+
* if (count == null) {
332+
* largeMemory.put(eventId, 1);
333+
* } else {
334+
* if (count > 1000) {
335+
* collect("Anomaly detected: " + eventId);
336+
* }
337+
* largeMemory.put(eventId, count + 1);
338+
* }
339+
* }
340+
* }
341+
* }</pre>
342+
*
293343
* <h1>Time and Timers</h1>
294344
*
295345
* <p>A PTF supports event time natively. Time-based services are available via {@link

0 commit comments

Comments
 (0)