Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37598][table] Support list and map state in PTFs #26396

Merged
merged 4 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,85 @@ class CountingFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Large State

Flink's state backends provide different types of state to efficiently handle large state.

Currently, PTFs support three types of state:

- **Value state**: Represents a single value.
- **List state**: Represents a list of values, supporting operations like appending, removing, and iterating.
- **Map state**: Represents a map (key-value pair) for efficient lookups, modifications, and removal of individual entries.

By default, state entries in a PTF are represented as value state. This means that every state entry is fully read from
the state backend when the evaluation method is called, and the value is written back to the state backend once the
evaluation method finishes.

To optimize state access and avoid unnecessary (de)serialization, state entries can be declared as:
- `org.apache.flink.table.api.dataview.ListView` (for list state)
- `org.apache.flink.table.api.dataview.MapView` (for map state)

These provide direct views to the underlying Flink state backend.

For example, when using a `MapView`, accessing a value via `MapView#get` will only deserialize the value associated with
the specified key. This allows for efficient access to individual entries without needing to load the entire map. This
approach is particularly useful when the map does not fit entirely into memory.

{{< hint info >}}
State TTL is applied individually to each entry in a list or map, allowing for fine-grained expiration control over state
elements.
{{< /hint >}}

The following example demonstrates how to declare and use a `MapView`. It assumes the PTF processes a table with the
schema `(userId, eventId, ...)`, partitioned by `userId`, with a high cardinality of distinct `eventId` values. For this
use case, it is generally recommended to partition the table by both `userId` and `eventId`. For example purposes, the
large state is stored as a map state.

{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
// Function that uses a map view for storing a large map for an event history per user
class LargeHistoryFunction extends ProcessTableFunction<String> {
public void eval(
@StateHint MapView<String, Integer> largeMemory,
@ArgumentHint(TABLE_AS_SET) Row input
) {
String eventId = input.getFieldAs("eventId");
Integer count = largeMemory.get(eventId);
if (count == null) {
largeMemory.put(eventId, 1);
} else {
if (count > 1000) {
collect("Anomaly detected: " + eventId);
}
largeMemory.put(eventId, count + 1);
}
}
}
```
{{< /tab >}}
{{< /tabs >}}

Similar to other data types, reflection is used to extract the necessary type information. If reflection is not
feasible - such as when a `Row` object is involved - type hints can be provided. Use the `ARRAY` data type for list views
and the `MAP` data type for map views.

{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
// Function that uses a list view of rows
class LargeHistoryFunction extends ProcessTableFunction<String> {
public void eval(
@StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) ListView<Row> largeMemory,
@ArgumentHint(TABLE_AS_SET) Row input
) {
...
}
}
```
{{< /tab >}}
{{< /tabs >}}

### Efficiency and Design Principles

A stateful function also means that data layout and data retention should be well thought
Expand Down
79 changes: 79 additions & 0 deletions docs/content/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,85 @@ class CountingFunction extends ProcessTableFunction<String> {
{{< /tab >}}
{{< /tabs >}}

### Large State

Flink's state backends provide different types of state to efficiently handle large state.

Currently, PTFs support three types of state:

- **Value state**: Represents a single value.
- **List state**: Represents a list of values, supporting operations like appending, removing, and iterating.
- **Map state**: Represents a map (key-value pair) for efficient lookups, modifications, and removal of individual entries.

By default, state entries in a PTF are represented as value state. This means that every state entry is fully read from
the state backend when the evaluation method is called, and the value is written back to the state backend once the
evaluation method finishes.

To optimize state access and avoid unnecessary (de)serialization, state entries can be declared as:
- `org.apache.flink.table.api.dataview.ListView` (for list state)
- `org.apache.flink.table.api.dataview.MapView` (for map state)

These provide direct views to the underlying Flink state backend.

For example, when using a `MapView`, accessing a value via `MapView#get` will only deserialize the value associated with
the specified key. This allows for efficient access to individual entries without needing to load the entire map. This
approach is particularly useful when the map does not fit entirely into memory.

{{< hint info >}}
State TTL is applied individually to each entry in a list or map, allowing for fine-grained expiration control over state
elements.
{{< /hint >}}

The following example demonstrates how to declare and use a `MapView`. It assumes the PTF processes a table with the
schema `(userId, eventId, ...)`, partitioned by `userId`, with a high cardinality of distinct `eventId` values. For this
use case, it is generally recommended to partition the table by both `userId` and `eventId`. For example purposes, the
large state is stored as a map state.

{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
// Function that uses a map view for storing a large map for an event history per user
class LargeHistoryFunction extends ProcessTableFunction<String> {
public void eval(
@StateHint MapView<String, Integer> largeMemory,
@ArgumentHint(TABLE_AS_SET) Row input
) {
String eventId = input.getFieldAs("eventId");
Integer count = largeMemory.get(eventId);
if (count == null) {
largeMemory.put(eventId, 1);
} else {
if (count > 1000) {
collect("Anomaly detected: " + eventId);
}
largeMemory.put(eventId, count + 1);
}
}
}
```
{{< /tab >}}
{{< /tabs >}}

Similar to other data types, reflection is used to extract the necessary type information. If reflection is not
feasible - such as when a `Row` object is involved - type hints can be provided. Use the `ARRAY` data type for list views
and the `MAP` data type for map views.

{{< tabs "1937eeed-3d13-455c-8e2f-5e164da9f844" >}}
{{< tab "Java" >}}
```java
// Function that uses a list view of rows
class LargeHistoryFunction extends ProcessTableFunction<String> {
public void eval(
@StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) ListView<Row> largeMemory,
@ArgumentHint(TABLE_AS_SET) Row input
) {
...
}
}
```
{{< /tab >}}
{{< /tabs >}}

### Efficiency and Design Principles

A stateful function also means that data layout and data retention should be well thought
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.table.api.dataview;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.functions.ImperativeAggregateFunction;
import org.apache.flink.table.functions.ProcessTableFunction;

/**
* A {@link DataView} is a collection type that can be used in the accumulator of an {@link
* ImperativeAggregateFunction}.
* A {@link DataView} is a collection type that can be used in the accumulator of aggregating
* functions and as a state entry in {@link ProcessTableFunction}s.
*
* <p>Depending on the context in which the function is used, a {@link DataView} can be backed by a
* Java heap collection or a state backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,21 @@
import java.util.Objects;

/**
* A {@link DataView} that provides {@link List}-like functionality in the accumulator of an {@link
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
* A {@link DataView} that provides {@link List}-like functionality in state entries.
*
* <p>A {@link ListView} can be backed by a Java {@link ArrayList} or can leverage Flink's state
* backends depending on the context in which the aggregate function is used. In many unbounded data
* scenarios, the {@link ListView} delegates all calls to a {@link ListState} instead of the {@link
* backends depending on the context. In many unbounded data scenarios, the {@link ListView}
* delegates all calls to a {@link ListState} instead of the {@link ArrayList}.
*
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
* Aggregate functions might be used at various locations (pre-aggregation, combiners, merging of
* window slides, etc.) for some of these locations the data view is not backed by state but {@link
* ArrayList}.
*
* <p>For process table functions, the view can be used as a top-level state entry. Data views in
* PTFs are always backed by state.
*
* <p>Note: Elements of a {@link ListView} must not be null. For heap-based state backends, {@code
* hashCode/equals} of the original (i.e. external) class are used. However, the serialization
* format will use internal data structures.
Expand All @@ -57,15 +64,15 @@
* public ListView<String> list = new ListView<>();
*
* // or explicit:
* // {@literal @}DataTypeHint("ARRAY<STRING>")
* // @DataTypeHint("ARRAY < STRING >")
* // public ListView<String> list = new ListView<>();
*
* public long count = 0L;
* }
*
* public class MyAggregateFunction extends AggregateFunction<String, MyAccumulator> {
*
* {@literal @}Override
* @Override
* public MyAccumulator createAccumulator() {
* return new MyAccumulator();
* }
Expand All @@ -75,7 +82,7 @@
* accumulator.count++;
* }
*
* {@literal @}Override
* @Override
* public String getValue(MyAccumulator accumulator) {
* // return the count and the joined elements
* return count + ": " + String.join("|", acc.list.get());
Expand All @@ -84,9 +91,6 @@
*
* }</pre>
*
* <p>{@code ListView(TypeInformation<?> elementType)} method was deprecated and then removed.
* Please use a {@link DataTypeHint} instead.
*
* @param <T> element type
*/
@PublicEvolving
Expand Down Expand Up @@ -152,7 +156,7 @@ public boolean remove(T value) throws Exception {
return list.remove(value);
}

/** Removes all of the elements from this list view. */
/** Removes all elements from this list view. */
@Override
public void clear() {
list.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.types.DataType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

/**
* A {@link DataView} that provides {@link Map}-like functionality in the accumulator of an {@link
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
* A {@link DataView} that provides {@link Map}-like functionality in state entries.
*
* <p>A {@link MapView} can be backed by a Java {@link HashMap} or can leverage Flink's state
* backends depending on the context in which the aggregate function is used. In many unbounded data
* scenarios, the {@link MapView} delegates all calls to a {@link MapState} instead of the {@link
* HashMap}.
* backends depending on the context. In many unbounded data scenarios, the {@link MapView}
* delegates all calls to a {@link MapState} instead of the {@link HashMap}.
*
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
* Aggregate functions might be used at various locations (pre-aggregation, combiners, merging of
* window slides, etc.) for some of these locations the data view is not backed by state but {@link
* ArrayList}.
*
* <p>For process table functions, the view can be used as a top-level state entry. Data views in
* PTFs are always backed by state.
*
* <p>Note: Keys of a {@link MapView} must not be null. Nulls in values are supported. For
* heap-based state backends, {@code hashCode/equals} of the original (i.e. external) class are
Expand All @@ -58,15 +66,15 @@
* public MapView<String, Integer> map = new MapView<>();
*
* // or explicit:
* // {@literal @}DataTypeHint("MAP<STRING, INT>")
* // @DataTypeHint("MAP < STRING, INT >")
* // public MapView<String, Integer> map = new MapView<>();
*
* public long count;
* }
*
* public class MyAggregateFunction extends AggregateFunction<Long, MyAccumulator> {
*
* {@literal @}Override
* @Override
* public MyAccumulator createAccumulator() {
* return new MyAccumulator();
* }
Expand All @@ -78,17 +86,14 @@
* }
* }
*
* {@literal @}Override
* @Override
* public Long getValue(MyAccumulator accumulator) {
* return accumulator.count;
* }
* }
*
* }</pre>
*
* <p>{@code MapView(TypeInformation<?> keyType, TypeInformation<?> valueType)} method was
* deprecated and removed. Please use a {@link DataTypeHint} instead.
*
* @param <K> key type
* @param <V> value type
*/
Expand Down Expand Up @@ -119,8 +124,9 @@ public void setMap(Map<K, V> map) {
/**
* Return the value for the specified key or {@code null} if the key is not in the map view.
*
* @param key The look up key.
* @return The value for the specified key.
* @param key The key whose associated value is to be returned
* @return The value to which the specified key is mapped, or {@code null} if this map contains
* no mapping for the key
* @throws Exception Thrown if the system cannot get data.
*/
public V get(K key) throws Exception {
Expand Down
Loading