Skip to content

Commit 61de85e

Browse files
committed
[FLINK-36322] Fix compile based on 2.0-preview
1 parent 26454e6 commit 61de85e

29 files changed

+105
-187
lines changed

Diff for: src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2525
import org.apache.flink.streaming.api.functions.async.ResultFuture;
2626
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
27-
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
27+
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
2828

2929
import org.openjdk.jmh.annotations.Benchmark;
3030
import org.openjdk.jmh.annotations.OperationsPerInvocation;

Diff for: src/main/java/org/apache/flink/benchmark/BackpressureUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private static RestClusterClient<StandaloneClusterId> createClient(
7373
int port, Configuration clientConfiguration) throws Exception {
7474
final Configuration clientConfig = new Configuration();
7575
clientConfig.addAll(clientConfiguration);
76-
clientConfig.setInteger(RestOptions.PORT, port);
76+
clientConfig.set(RestOptions.PORT, port);
7777
return new RestClusterClient<>(clientConfig, StandaloneClusterId.getInstance());
7878
}
7979

Diff for: src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java

+9-57
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,6 @@ public void compressedFilePartition(CompressedFileEnvironmentContext context) th
6060
executeBenchmark(context.env);
6161
}
6262

63-
@Benchmark
64-
public void uncompressedMmapPartition(UncompressedMmapEnvironmentContext context)
65-
throws Exception {
66-
executeBenchmark(context.env);
67-
}
68-
69-
@Benchmark
70-
public void compressedSortPartition(CompressedSortEnvironmentContext context) throws Exception {
71-
executeBenchmark(context.env);
72-
}
73-
74-
@Benchmark
75-
public void uncompressedSortPartition(UncompressedSortEnvironmentContext context) throws Exception {
76-
executeBenchmark(context.env);
77-
}
78-
7963
private void executeBenchmark(StreamExecutionEnvironment env) throws Exception {
8064
StreamGraph streamGraph =
8165
StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION);
@@ -102,23 +86,15 @@ public void setUp() throws Exception {
10286
}
10387

10488
protected Configuration createConfiguration(
105-
boolean compressionEnabled, String subpartitionType, boolean isSortShuffle) {
89+
boolean compressionEnabled) {
10690
Configuration configuration = super.createConfiguration();
10791

108-
if (isSortShuffle) {
109-
configuration.setInteger(
110-
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
111-
} else {
112-
configuration.setInteger(
113-
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
114-
Integer.MAX_VALUE);
115-
}
116-
configuration.setBoolean(
117-
NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED,
118-
compressionEnabled);
119-
configuration.setString(
120-
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, subpartitionType);
121-
configuration.setString(
92+
configuration.set(
93+
NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC,
94+
compressionEnabled ?
95+
NettyShuffleEnvironmentOptions.CompressionCodec.LZ4
96+
: NettyShuffleEnvironmentOptions.CompressionCodec.NONE);
97+
configuration.set(
12298
CoreOptions.TMP_DIRS,
12399
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
124100
return configuration;
@@ -129,39 +105,15 @@ public static class UncompressedFileEnvironmentContext
129105
extends BlockingPartitionEnvironmentContext {
130106
@Override
131107
protected Configuration createConfiguration() {
132-
return createConfiguration(false, "file", false);
108+
return createConfiguration(false);
133109
}
134110
}
135111

136112
public static class CompressedFileEnvironmentContext
137113
extends BlockingPartitionEnvironmentContext {
138114
@Override
139115
protected Configuration createConfiguration() {
140-
return createConfiguration(true, "file", false);
141-
}
142-
}
143-
144-
public static class UncompressedMmapEnvironmentContext
145-
extends BlockingPartitionEnvironmentContext {
146-
@Override
147-
protected Configuration createConfiguration() {
148-
return createConfiguration(false, "mmap", false);
149-
}
150-
}
151-
152-
public static class CompressedSortEnvironmentContext
153-
extends BlockingPartitionEnvironmentContext {
154-
@Override
155-
protected Configuration createConfiguration() {
156-
return createConfiguration(true, "file", true);
157-
}
158-
}
159-
160-
public static class UncompressedSortEnvironmentContext
161-
extends BlockingPartitionEnvironmentContext {
162-
@Override
163-
protected Configuration createConfiguration() {
164-
return createConfiguration(false, "file", true);
116+
return createConfiguration(true);
165117
}
166118
}
167119
}

Diff for: src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java

+3-28
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,6 @@ public static void main(String[] args) throws RunnerException {
4848
new Runner(options).run();
4949
}
5050

51-
@Benchmark
52-
public void remoteFilePartition(RemoteFileEnvironmentContext context) throws Exception {
53-
StreamGraph streamGraph =
54-
StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
55-
context.miniCluster.executeJobBlocking(
56-
StreamingJobGraphGenerator.createJobGraph(streamGraph));
57-
}
58-
5951
@Benchmark
6052
public void remoteSortPartition(RemoteSortEnvironmentContext context) throws Exception {
6153
StreamGraph streamGraph =
@@ -75,20 +67,10 @@ public void setUp() throws Exception {
7567
env.setBufferTimeout(-1);
7668
}
7769

78-
protected Configuration createConfiguration(boolean isSortShuffle) {
70+
protected Configuration createConfiguration() {
7971
Configuration configuration = super.createConfiguration();
8072

81-
if (isSortShuffle) {
82-
configuration.setInteger(
83-
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
84-
} else {
85-
configuration.setInteger(
86-
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
87-
Integer.MAX_VALUE);
88-
}
89-
configuration.setString(
90-
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
91-
configuration.setString(
73+
configuration.set(
9274
CoreOptions.TMP_DIRS,
9375
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
9476
return configuration;
@@ -100,17 +82,10 @@ protected int getNumberOfVertices() {
10082
}
10183
}
10284

103-
public static class RemoteFileEnvironmentContext extends BlockingPartitionEnvironmentContext {
104-
@Override
105-
protected Configuration createConfiguration() {
106-
return createConfiguration(false);
107-
}
108-
}
109-
11085
public static class RemoteSortEnvironmentContext extends BlockingPartitionEnvironmentContext {
11186
@Override
11287
protected Configuration createConfiguration() {
113-
return createConfiguration(true);
88+
return super.createConfiguration();
11489
}
11590
}
11691
}

Diff for: src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
package org.apache.flink.benchmark;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.configuration.CheckpointingOptions;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.configuration.MemorySize;
2425
import org.apache.flink.configuration.TaskManagerOptions;
2526
import org.apache.flink.core.execution.JobClient;
2627
import org.apache.flink.runtime.jobgraph.JobVertexID;
2728
import org.apache.flink.runtime.testutils.CommonTestUtils;
28-
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
29-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
29+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
3030
import org.apache.flink.streaming.api.graph.StreamGraph;
3131

3232
import java.time.Duration;
@@ -93,31 +93,31 @@ protected Configuration createConfiguration() {
9393
public enum CheckpointMode {
9494
UNALIGNED(
9595
config -> {
96-
config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true);
96+
config.set(CheckpointingOptions.ENABLE_UNALIGNED, true);
9797
config.set(
9898
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
9999
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
100100
config.set(
101-
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
101+
CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
102102
Duration.ofMillis(0));
103103
config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
104104
return config;
105105
}),
106106
UNALIGNED_1(
107107
config -> {
108-
config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true);
108+
config.set(CheckpointingOptions.ENABLE_UNALIGNED, true);
109109
config.set(
110110
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
111111
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
112112
config.set(
113-
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
113+
CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
114114
Duration.ofMillis(1));
115115
config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
116116
return config;
117117
}),
118118
ALIGNED(
119119
config -> {
120-
config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
120+
config.set(CheckpointingOptions.ENABLE_UNALIGNED, false);
121121
config.set(
122122
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
123123
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);

Diff for: src/main/java/org/apache/flink/benchmark/CollectSink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.flink.benchmark;
2020

21-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
21+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
2222

2323
import java.util.ArrayList;
2424
import java.util.List;

Diff for: src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,18 @@
1818
package org.apache.flink.benchmark;
1919

2020
import org.apache.flink.api.common.io.FileInputFormat;
21-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2221
import org.apache.flink.api.common.typeinfo.TypeInformation;
2322
import org.apache.flink.configuration.Configuration;
2423
import org.apache.flink.core.fs.FileInputSplit;
2524
import org.apache.flink.core.fs.Path;
2625
import org.apache.flink.core.testutils.OneShotLatch;
2726
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
27+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
2928
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
30-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
3129
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
3230

3331
import joptsimple.internal.Strings;
32+
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
3433
import org.openjdk.jmh.annotations.Benchmark;
3534
import org.openjdk.jmh.annotations.OperationsPerInvocation;
3635
import org.openjdk.jmh.runner.Runner;
@@ -76,7 +75,6 @@ public static void main(String[] args) throws RunnerException {
7675
public void readFileSplit(FlinkEnvironmentContext context) throws Exception {
7776
TARGET_COUNT_REACHED_LATCH.reset();
7877
StreamExecutionEnvironment env = context.env;
79-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
8078
env.enableCheckpointing(100)
8179
.setParallelism(1)
8280
.addSource(new MockSourceFunction())

Diff for: src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818

1919
package org.apache.flink.benchmark;
2020

21-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
21+
import org.apache.flink.configuration.CheckpointingOptions;
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.configuration.DeploymentOptions;
24-
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
2524
import org.apache.flink.configuration.RestOptions;
25+
import org.apache.flink.configuration.RestartStrategyOptions;
26+
import org.apache.flink.configuration.StateBackendOptions;
2627
import org.apache.flink.runtime.minicluster.MiniCluster;
2728
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
28-
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
29-
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
3029
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3130
import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
3231

@@ -54,6 +53,8 @@ public void setUp() throws Exception {
5453
throw new RuntimeException("setUp was called multiple times!");
5554
}
5655
final Configuration clusterConfig = createConfiguration();
56+
clusterConfig.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
57+
clusterConfig.set(StateBackendOptions.STATE_BACKEND, "hashmap");
5758
miniCluster =
5859
new MiniCluster(
5960
new MiniClusterConfiguration.Builder()
@@ -78,8 +79,6 @@ public void setUp() throws Exception {
7879
if (objectReuse) {
7980
env.getConfig().enableObjectReuse();
8081
}
81-
env.setRestartStrategy(RestartStrategies.noRestart());
82-
env.setStateBackend(new MemoryStateBackend());
8382
}
8483

8584
@TearDown
@@ -102,14 +101,15 @@ public void execute() throws Exception {
102101

103102
protected Configuration createConfiguration() {
104103
final Configuration configuration = new Configuration();
105-
configuration.setString(RestOptions.BIND_PORT, "0");
106-
configuration.setInteger(
107-
NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS);
104+
configuration.set(RestOptions.BIND_PORT, "0");
105+
// no equivalent config available.
106+
//configuration.setInteger(
107+
// NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS);
108108
configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME);
109109
configuration.set(DeploymentOptions.ATTACHED, true);
110110
// It doesn't make sense to wait for the final checkpoint in benchmarks since it only prolongs
111111
// the test but doesn't give any advantages.
112-
configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
112+
configuration.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
113113
// TODO: remove this line after FLINK-28243 will be done
114114
configuration.set(REQUIREMENTS_CHECK_DELAY, Duration.ZERO);
115115
return configuration;

Diff for: src/main/java/org/apache/flink/benchmark/InputBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.benchmark.functions.MultiplyByTwo;
2323
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2424
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25-
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
25+
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
2626

2727
import org.openjdk.jmh.annotations.Benchmark;
2828
import org.openjdk.jmh.annotations.OperationsPerInvocation;

Diff for: src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020

2121
import org.apache.flink.api.java.tuple.Tuple2;
2222
import org.apache.flink.benchmark.functions.BaseSourceWithKeyRange;
23+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
2324
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24-
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
25+
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
2526

27+
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
2628
import org.openjdk.jmh.annotations.Benchmark;
2729
import org.openjdk.jmh.annotations.OperationsPerInvocation;
2830
import org.openjdk.jmh.runner.Runner;
@@ -54,7 +56,7 @@ public void tupleKeyBy(FlinkEnvironmentContext context) throws Exception {
5456
env.setParallelism(4);
5557

5658
env.addSource(new IncreasingTupleSource(TUPLE_RECORDS_PER_INVOCATION, 10))
57-
.keyBy(0)
59+
.keyBy(e -> e.f0)
5860
.addSink(new DiscardingSink<>());
5961

6062
env.execute();
@@ -66,8 +68,8 @@ public void arrayKeyBy(FlinkEnvironmentContext context) throws Exception {
6668
StreamExecutionEnvironment env = context.env;
6769
env.setParallelism(4);
6870

69-
env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10))
70-
.keyBy(0)
71+
DataStreamSource<int[]> source = env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10));
72+
source.keyBy(KeySelectorUtil.getSelectorForArray(new int[]{0}, source.getType()))
7173
.addSink(new DiscardingSink<>());
7274

7375
env.execute();

Diff for: src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void stateBackends(MemoryStateBackendContext context) throws Exception {
5858

5959
@State(Thread)
6060
public static class MemoryStateBackendContext extends StateBackendContext {
61-
@Param({"MEMORY", "FS", "FS_ASYNC"})
61+
@Param({"MEMORY", "FS"})
6262
public StateBackend stateBackend = StateBackend.MEMORY;
6363

6464
@Override

Diff for: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3737
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
3838
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39-
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
40-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
39+
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
40+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
4141
import org.apache.flink.streaming.api.operators.ChainingStrategy;
4242
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
4343

0 commit comments

Comments
 (0)