diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java index edae579c92313..13a8ead51ee08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; @@ -195,6 +196,11 @@ public boolean isInternalSorterSupported() { @Override public void enableAsyncState() { - // nothing to do. + OneInputStreamOperator operator = + (OneInputStreamOperator) + ((SimpleOperatorFactory) operatorFactory).getOperator(); + if (!(operator instanceof AsyncStateProcessingOperator)) { + super.enableAsyncState(); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java index a29164c51b6be..48bf37c409c7e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.io.InputFormat; @@ -119,6 +120,7 @@ import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; import org.apache.flink.util.SerializedValue; import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; @@ -155,6 +157,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** * Tests for {@link StreamingJobGraphGenerator} and {@link AdaptiveGraphManager}. @@ -2163,6 +2166,32 @@ void testOutputFormatSupportConcurrentExecutionAttempts() { new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); } + @Test + void testEnableAsyncStateForSyncOperatorThrowException() throws Exception { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + try { + env.fromData(1, 2, 3, 4, 5) + .keyBy(k -> k) + .flatMap( + new FlatMapFunction() { + @Override + public void flatMap(Integer value, Collector out) + throws Exception { + out.collect(value); + } + }) + .enableAsyncState() + .print(); + fail("Enabling async state for synchronous operators is forbidden."); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage()) + .isEqualTo( + "The transformation does not support " + + "async state, or you are enabling the async state without a keyed context (not behind a keyBy())."); + } + } + private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts( OutputFormat outputFormat, boolean isSupported) { final StreamExecutionEnvironment env =