diff --git a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java index 8d23b6e..dd4a242 100644 --- a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java @@ -19,6 +19,7 @@ package org.apache.flink.benchmark; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; @@ -67,8 +68,9 @@ public void serializerPojo(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.getSerializerConfig().registerPojoType(MyPojo.class); - executionConfig.getSerializerConfig().registerPojoType(MyOperation.class); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + serializerConfig.registerPojoType(MyPojo.class); + serializerConfig.registerPojoType(MyOperation.class); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() @@ -83,8 +85,9 @@ public void serializerHeavyString(FlinkEnvironmentContext context) throws Except StreamExecutionEnvironment env = context.env; env.setParallelism(1); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.getSerializerConfig().registerPojoType(MyPojo.class); - executionConfig.getSerializerConfig().registerPojoType(MyOperation.class); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + serializerConfig.registerPojoType(MyPojo.class); + serializerConfig.registerPojoType(MyOperation.class); env.addSource(new LongStringSource(RECORDS_PER_INVOCATION, 12)) .rebalance() @@ -112,9 +115,10 @@ public void serializerKryo(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.getSerializerConfig().setForceKryo(true); - executionConfig.getSerializerConfig().registerKryoType(MyPojo.class); - executionConfig.getSerializerConfig().registerKryoType(MyOperation.class); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + serializerConfig.setForceKryo(true); + serializerConfig.registerKryoType(MyPojo.class); + serializerConfig.registerKryoType(MyOperation.class); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java index 7053465..dafb4ae 100644 --- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java @@ -19,6 +19,7 @@ package org.apache.flink.benchmark.full; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.benchmark.FlinkEnvironmentContext; @@ -83,7 +84,7 @@ public void serializerKryoWithoutRegistration(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - env.getConfig().getSerializerConfig().setForceAvro(true); + ((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceAvro(true); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() @@ -97,7 +98,7 @@ public void serializerKryoWithoutRegistration(FlinkEnvironmentContext context) public void serializerAvroReflect(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.setParallelism(4); - env.getConfig().getSerializerConfig().setForceAvro(true); + ((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceAvro(true); env.addSource(new PojoSource(RECORDS_PER_INVOCATION, 10)) .rebalance() @@ -126,11 +127,11 @@ public void serializerKryoThrift(FlinkEnvironmentContext context) throws Excepti StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - - executionConfig.getSerializerConfig().setForceKryo(true); - executionConfig.getSerializerConfig().addDefaultKryoSerializer( + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + serializerConfig.setForceKryo(true); + serializerConfig.addDefaultKryoSerializer( org.apache.flink.benchmark.thrift.MyPojo.class, TBaseSerializer.class); - executionConfig.getSerializerConfig().addDefaultKryoSerializer( + serializerConfig.addDefaultKryoSerializer( org.apache.flink.benchmark.thrift.MyOperation.class, TBaseSerializer.class); env.addSource(new ThriftPojoSource(RECORDS_PER_INVOCATION, 10)) @@ -146,11 +147,12 @@ public void serializerKryoProtobuf(FlinkEnvironmentContext context) throws Excep StreamExecutionEnvironment env = context.env; env.setParallelism(4); ExecutionConfig executionConfig = env.getConfig(); - executionConfig.getSerializerConfig().setForceKryo(true); - executionConfig.getSerializerConfig().registerTypeWithKryoSerializer( + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig(); + serializerConfig.setForceKryo(true); + serializerConfig.registerTypeWithKryoSerializer( org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class, ProtobufSerializer.class); - executionConfig.getSerializerConfig().registerTypeWithKryoSerializer( + serializerConfig.registerTypeWithKryoSerializer( org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyOperation.class, ProtobufSerializer.class);