Skip to content

Commit 10c4a96

Browse files
committed
Feedback addressed
1 parent f96bc7f commit 10c4a96

File tree

6 files changed

+27
-6
lines changed

6 files changed

+27
-6
lines changed

docs/content.zh/docs/dev/table/functions/ptfs.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,11 @@ State TTL is applied individually to each entry in a list or map, allowing for f
737737
elements.
738738
{{< /hint >}}
739739

740+
The following example demonstrates how to declare and use a `MapView`. It assumes the PTF processes a table with the
741+
schema `(userId, eventId, ...)`, partitioned by `userId`, with a high cardinality of distinct `eventId` values. For this
742+
use case, it is generally recommended to partition the table by both `userId` and `eventId`. For example purposes, the
743+
large state is stored as a map state.
744+
740745
{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
741746
{{< tab "Java" >}}
742747
```java

docs/content/docs/dev/table/functions/ptfs.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,11 @@ State TTL is applied individually to each entry in a list or map, allowing for f
737737
elements.
738738
{{< /hint >}}
739739

740+
The following example demonstrates how to declare and use a `MapView`. It assumes the PTF processes a table with the
741+
schema `(userId, eventId, ...)`, partitioned by `userId`, with a high cardinality of distinct `eventId` values. For this
742+
use case, it is generally recommended to partition the table by both `userId` and `eventId`. For example purposes, the
743+
large state is stored as a map state.
744+
740745
{{< tabs "1837eeed-3d13-455c-8e2f-5e164da9f844" >}}
741746
{{< tab "Java" >}}
742747
```java

flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/ListView.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,12 @@
3939
*
4040
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
4141
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
42+
* Aggregate functions might be used at various locations (pre-aggregation, combiners, merging of
43+
* window slides, etc.) for some of these locations the data view is not backed by state but {@link
44+
* ArrayList}.
4245
*
43-
* <p>For process table functions, the view can be used as a top-level state entry.
46+
* <p>For process table functions, the view can be used as a top-level state entry. Data views in
47+
* PTFs are always backed by state.
4448
*
4549
* <p>Note: Elements of a {@link ListView} must not be null. For heap-based state backends, {@code
4650
* hashCode/equals} of the original (i.e. external) class are used. However, the serialization

flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.table.functions.TableAggregateFunction;
2727
import org.apache.flink.table.types.DataType;
2828

29+
import java.util.ArrayList;
2930
import java.util.HashMap;
3031
import java.util.Iterator;
3132
import java.util.Map;
@@ -40,8 +41,12 @@
4041
*
4142
* <p>For aggregating functions, the view can be used as a field in the accumulator of an {@link
4243
* AggregateFunction} or {@link TableAggregateFunction} when large amounts of data are expected.
44+
* Aggregate functions might be used at various locations (pre-aggregation, combiners, merging of
45+
* window slides, etc.) for some of these locations the data view is not backed by state but {@link
46+
* ArrayList}.
4347
*
44-
* <p>For process table functions, the view can be used as a top-level state entry.
48+
* <p>For process table functions, the view can be used as a top-level state entry. Data views in
49+
* PTFs are always backed by state.
4550
*
4651
* <p>Note: Keys of a {@link MapView} must not be null. Nulls in values are supported. For
4752
* heap-based state backends, {@code hashCode/equals} of the original (i.e. external) class are
@@ -119,8 +124,9 @@ public void setMap(Map<K, V> map) {
119124
/**
120125
* Return the value for the specified key or {@code null} if the key is not in the map view.
121126
*
122-
* @param key The lookup key.
123-
* @return The value for the specified key.
127+
* @param key The key whose associated value is to be returned
128+
* @return The value to which the specified key is mapped, or {@code null} if this map contains
129+
* no mapping for the key
124130
* @throws Exception Thrown if the system cannot get data.
125131
*/
126132
public V get(K key) throws Exception {

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ object ProcessTableRunnerGenerator {
316316
val constructorCode = stateDataType.getConversionClass match {
317317
case rowType if rowType == classOf[Row] =>
318318
// This allows us to retrieve the converter term that has been generated
319-
// in genToExternalConverter(). The converter is able to created named positions
319+
// in genToExternalConverter(). The converter is able to create named positions
320320
// for row fields.
321321
val converterTerm = ctx.addReusableConverter(stateDataType)
322322
s"((${className[RowRowConverter]}) $converterTerm).createEmptyRow()"

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/dataview/DataViewUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public static boolean isDataView(LogicalType viewType, Class<? extends DataView>
7777
/** Checks that the given type and its children do not contain data views. */
7878
public static void checkForInvalidDataViews(LogicalType type) {
7979
if (hasNested(type, t -> isDataView(t, DataView.class))) {
80-
throw new ValidationException("Data views are not supported at the declared location.");
80+
throw new ValidationException(
81+
"Data views are not supported at the declared location. Given type: " + type);
8182
}
8283
}
8384

0 commit comments

Comments
 (0)