Skip to content

Commit 172fbb2

Browse files
authored
MINOR: Add unit test for SerDe auto-configuration (apache#6610)
Reviewers: Guozhang Wang <[email protected]>, Ted Yu <[email protected]>
1 parent 409fabc commit 172fbb2

File tree

8 files changed

+439
-14
lines changed

8 files changed

+439
-14
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -647,11 +647,11 @@ private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> so
647647
}
648648
}
649649
for (final SourceNode sn : sources) {
650-
if (sn.getKeyDeSerializer() != null) {
651-
sn.getKeyDeSerializer().configure(config.originals(), true);
650+
if (sn.getKeyDeserializer() != null) {
651+
sn.getKeyDeserializer().configure(config.originals(), true);
652652
}
653-
if (sn.getValueDeSerializer() != null) {
654-
sn.getValueDeSerializer().configure(config.originals(), false);
653+
if (sn.getValueDeserializer() != null) {
654+
sn.getValueDeserializer().configure(config.originals(), false);
655655
}
656656
}
657657
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ public SourceNode(final String name,
5252
this(name, topics, null, keyDeserializer, valDeserializer);
5353
}
5454

55-
public Deserializer getKeyDeSerializer() {
55+
public Deserializer getKeyDeserializer() {
5656
return keyDeserializer;
5757
}
5858

59-
public Deserializer getValueDeSerializer() {
59+
public Deserializer getValueDeserializer() {
6060
return valDeserializer;
6161
}
6262

streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,20 @@
2525
import org.apache.kafka.common.config.ConfigException;
2626
import org.apache.kafka.common.metrics.Sensor;
2727
import org.apache.kafka.common.network.Selectable;
28+
import org.apache.kafka.common.serialization.Deserializer;
29+
import org.apache.kafka.common.serialization.Serde;
2830
import org.apache.kafka.common.serialization.Serdes;
31+
import org.apache.kafka.common.serialization.Serializer;
2932
import org.apache.kafka.common.serialization.StringDeserializer;
3033
import org.apache.kafka.common.serialization.StringSerializer;
3134
import org.apache.kafka.common.utils.Utils;
3235
import org.apache.kafka.streams.errors.StreamsException;
3336
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
3437
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
3538
import org.apache.kafka.streams.kstream.Consumed;
39+
import org.apache.kafka.streams.kstream.Grouped;
3640
import org.apache.kafka.streams.kstream.Materialized;
41+
import org.apache.kafka.streams.kstream.Produced;
3742
import org.apache.kafka.streams.processor.AbstractProcessor;
3843
import org.apache.kafka.streams.processor.ThreadMetadata;
3944
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -74,6 +79,13 @@
7479
import java.util.stream.Collectors;
7580

7681
import static java.util.Arrays.asList;
82+
import static org.easymock.EasyMock.anyObject;
83+
import static org.easymock.EasyMock.eq;
84+
import static org.easymock.EasyMock.expect;
85+
import static org.easymock.EasyMock.expectLastCall;
86+
import static org.easymock.EasyMock.mock;
87+
import static org.easymock.EasyMock.replay;
88+
import static org.easymock.EasyMock.verify;
7789
import static org.junit.Assert.assertEquals;
7890
import static org.junit.Assert.assertFalse;
7991
import static org.junit.Assert.assertNotNull;
@@ -725,6 +737,131 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
725737
startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
726738
}
727739

740+
@SuppressWarnings("unchecked")
741+
@Test
742+
public void shouldInitializeUserSerdes() {
743+
final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
744+
mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
745+
expectLastCall();
746+
final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
747+
mockSourceValueDeserialzer.configure(anyObject(), eq(false));
748+
expectLastCall();
749+
750+
final Serde mockSourceKeySerde = mock(Serde.class);
751+
final Serde mockSourceValueSerde = mock(Serde.class);
752+
expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
753+
expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
754+
755+
756+
757+
final Serializer mockThroughKeySerializer = mock(Serializer.class);
758+
mockThroughKeySerializer.configure(anyObject(), eq(true));
759+
expectLastCall();
760+
final Serializer mockThroughValueSerializer = mock(Serializer.class);
761+
mockThroughValueSerializer.configure(anyObject(), eq(false));
762+
expectLastCall();
763+
final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
764+
mockThroughKeyDeserializer.configure(anyObject(), eq(true));
765+
expectLastCall();
766+
final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
767+
mockThroughValueDeserializer.configure(anyObject(), eq(false));
768+
expectLastCall();
769+
770+
final Serde mockThroughKeySerde = mock(Serde.class);
771+
final Serde mockThroughValueSerde = mock(Serde.class);
772+
expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
773+
expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
774+
expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
775+
expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
776+
777+
778+
779+
final Serializer mockGroupedKeySerializer = mock(Serializer.class);
780+
mockGroupedKeySerializer.configure(anyObject(), eq(true));
781+
expectLastCall();
782+
final Serializer mockGroupedValueSerializer = mock(Serializer.class);
783+
mockGroupedValueSerializer.configure(anyObject(), eq(false));
784+
expectLastCall();
785+
final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
786+
mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
787+
expectLastCall();
788+
final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
789+
mockGroupedValueDeserializer.configure(anyObject(), eq(false));
790+
expectLastCall();
791+
792+
final Serde mockGroupedKeySerde = mock(Serde.class);
793+
final Serde mockGroupedValueSerde = mock(Serde.class);
794+
expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
795+
expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
796+
expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
797+
expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
798+
799+
800+
801+
final Serializer mockOutputKeySerializer = mock(Serializer.class);
802+
mockOutputKeySerializer.configure(anyObject(), eq(true));
803+
expectLastCall();
804+
final Serializer mockOutputValueSerializer = mock(Serializer.class);
805+
mockOutputValueSerializer.configure(anyObject(), eq(false));
806+
expectLastCall();
807+
808+
final Serde mockOutputKeySerde = mock(Serde.class);
809+
final Serde mockOutputValueSerde = mock(Serde.class);
810+
expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
811+
expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
812+
813+
814+
815+
final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
816+
mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
817+
expectLastCall();
818+
final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
819+
mockGlobalValueDeserializer.configure(anyObject(), eq(false));
820+
expectLastCall();
821+
822+
final Serde mockGlobalKeySerde = mock(Serde.class);
823+
final Serde mockGlobalValueSerde = mock(Serde.class);
824+
expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
825+
expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
826+
827+
828+
829+
builder
830+
.stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
831+
.through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
832+
.selectKey(KeyValue::pair)
833+
.groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
834+
.count()
835+
.toStream()
836+
.to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
837+
builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
838+
839+
replay(
840+
mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
841+
mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
842+
mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
843+
mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
844+
mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
845+
mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
846+
mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
847+
848+
KafkaStreams kafkaStreams = null;
849+
try {
850+
kafkaStreams = new KafkaStreams(builder.build(), props);
851+
} finally {
852+
if (kafkaStreams != null) {
853+
kafkaStreams.close();
854+
}
855+
}
856+
857+
verify(
858+
mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
859+
mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
860+
mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
861+
mockOutputKeySerializer, mockOutputValueSerializer,
862+
mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
863+
}
864+
728865
@SuppressWarnings("unchecked")
729866
private Topology getStatefulTopology(final String inputTopic,
730867
final String outputTopic,

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.metrics.KafkaMetric;
2222
import org.apache.kafka.common.metrics.Metrics;
2323
import org.apache.kafka.common.metrics.Sensor;
24+
import org.apache.kafka.common.serialization.Serde;
2425
import org.apache.kafka.common.serialization.Serdes;
2526
import org.apache.kafka.common.utils.Bytes;
2627
import org.apache.kafka.common.utils.MockTime;
@@ -39,8 +40,8 @@
3940
import org.junit.runner.RunWith;
4041

4142
import java.util.Collections;
42-
import java.util.List;
4343
import java.util.HashMap;
44+
import java.util.List;
4445
import java.util.Map;
4546

4647
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -52,6 +53,7 @@
5253
import static org.easymock.EasyMock.expectLastCall;
5354
import static org.easymock.EasyMock.mock;
5455
import static org.easymock.EasyMock.replay;
56+
import static org.easymock.EasyMock.reset;
5557
import static org.easymock.EasyMock.verify;
5658
import static org.hamcrest.CoreMatchers.equalTo;
5759
import static org.hamcrest.MatcherAssert.assertThat;
@@ -102,6 +104,53 @@ private void init() {
102104
metered.init(context, metered);
103105
}
104106

107+
@SuppressWarnings("unchecked")
108+
@Test
109+
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
110+
metered = new MeteredKeyValueStore<>(
111+
inner,
112+
"scope",
113+
new MockTime(),
114+
null,
115+
null
116+
);
117+
final Serde mockSerde = mock(Serde.class);
118+
replay(mockSerde);
119+
expect(context.keySerde()).andReturn(mockSerde);
120+
expect(context.valueSerde()).andReturn(mockSerde);
121+
122+
init();
123+
verify(context, mockSerde);
124+
}
125+
126+
@Test
127+
public void shouldConfigureUserSerdes() {
128+
final Serde<String> mockKeySerde = mock(Serde.class);
129+
mockKeySerde.configure(anyObject(), eq(true));
130+
expectLastCall();
131+
132+
final Serde<String> mockValueSerde = mock(Serde.class);
133+
mockValueSerde.configure(anyObject(), eq(false));
134+
expectLastCall();
135+
136+
replay(mockKeySerde, mockValueSerde);
137+
138+
metered = new MeteredKeyValueStore<>(
139+
inner,
140+
"scope",
141+
new MockTime(),
142+
mockKeySerde,
143+
mockValueSerde
144+
);
145+
146+
reset(context);
147+
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
148+
expect(context.taskId()).andReturn(taskId).anyTimes();
149+
150+
init();
151+
verify(context, mockKeySerde, mockValueSerde);
152+
}
153+
105154
@Test
106155
public void testMetrics() {
107156
init();

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.metrics.KafkaMetric;
2323
import org.apache.kafka.common.metrics.Metrics;
2424
import org.apache.kafka.common.metrics.Sensor;
25+
import org.apache.kafka.common.serialization.Serde;
2526
import org.apache.kafka.common.serialization.Serdes;
2627
import org.apache.kafka.common.utils.Bytes;
2728
import org.apache.kafka.common.utils.MockTime;
@@ -54,6 +55,7 @@
5455
import static org.easymock.EasyMock.expectLastCall;
5556
import static org.easymock.EasyMock.mock;
5657
import static org.easymock.EasyMock.replay;
58+
import static org.easymock.EasyMock.reset;
5759
import static org.easymock.EasyMock.verify;
5860
import static org.hamcrest.CoreMatchers.equalTo;
5961
import static org.hamcrest.MatcherAssert.assertThat;
@@ -101,6 +103,53 @@ private void init() {
101103
metered.init(context, metered);
102104
}
103105

106+
@SuppressWarnings("unchecked")
107+
@Test
108+
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
109+
metered = new MeteredSessionStore<>(
110+
inner,
111+
"scope",
112+
null,
113+
null,
114+
new MockTime()
115+
);
116+
final Serde mockSerde = mock(Serde.class);
117+
replay(mockSerde);
118+
expect(context.keySerde()).andReturn(mockSerde);
119+
expect(context.valueSerde()).andReturn(mockSerde);
120+
121+
init();
122+
verify(context, mockSerde);
123+
}
124+
125+
@Test
126+
public void shouldConfigureUserSerdes() {
127+
final Serde<String> mockKeySerde = mock(Serde.class);
128+
mockKeySerde.configure(anyObject(), eq(true));
129+
expectLastCall();
130+
131+
final Serde<String> mockValueSerde = mock(Serde.class);
132+
mockValueSerde.configure(anyObject(), eq(false));
133+
expectLastCall();
134+
135+
replay(mockKeySerde, mockValueSerde);
136+
137+
metered = new MeteredSessionStore<>(
138+
inner,
139+
"scope",
140+
mockKeySerde,
141+
mockValueSerde,
142+
new MockTime()
143+
);
144+
145+
reset(context);
146+
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
147+
expect(context.taskId()).andReturn(taskId).anyTimes();
148+
149+
init();
150+
verify(context, mockKeySerde, mockValueSerde);
151+
}
152+
104153
@Test
105154
public void testMetrics() {
106155
init();

0 commit comments

Comments
 (0)