Skip to content

Commit e56ebbf

Browse files
tedyumjsax
authored andcommitted
[KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (apache#6461)
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
1 parent 964e90a commit e56ebbf

File tree

13 files changed

+135
-17
lines changed

13 files changed

+135
-17
lines changed

docs/streams/developer-guide/datatypes.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ <h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Pe
6868
</div>
6969
<div class="section" id="overriding-default-serdes">
7070
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
71-
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
71+
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
7272
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
7373
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
7474

docs/streams/developer-guide/dsl-api.html

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3547,7 +3547,8 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
35473547
// Write the (continuously updating) results to the output topic.
35483548
clicksPerRegion.toStream.to(outputTopic)
35493549
</pre>
3550-
<p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
3550+
<p>A complete example of user-defined SerDes can be found in a test class within the library.
3551+
Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
35513552
</div>
35523553
</div>
35533554
</div>

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
5050
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
5151
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
52+
import org.apache.kafka.streams.processor.internals.SinkNode;
53+
import org.apache.kafka.streams.processor.internals.SourceNode;
5254
import org.apache.kafka.streams.processor.internals.StateDirectory;
5355
import org.apache.kafka.streams.processor.internals.StreamThread;
5456
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -634,6 +636,26 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
634636
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
635637
}
636638

639+
@SuppressWarnings("unchecked")
640+
private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
641+
for (final SinkNode sn : sinks) {
642+
if (sn.getKeySerializer() != null) {
643+
sn.getKeySerializer().configure(config.originals(), true);
644+
}
645+
if (sn.getValueSerializer() != null) {
646+
sn.getValueSerializer().configure(config.originals(), false);
647+
}
648+
}
649+
for (final SourceNode sn : sources) {
650+
if (sn.getKeyDeSerializer() != null) {
651+
sn.getKeyDeSerializer().configure(config.originals(), true);
652+
}
653+
if (sn.getValueDeSerializer() != null) {
654+
sn.getValueDeSerializer().configure(config.originals(), false);
655+
}
656+
}
657+
}
658+
637659
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
638660
final StreamsConfig config,
639661
final KafkaClientSupplier clientSupplier,
@@ -670,6 +692,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
670692
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
671693
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
672694

695+
configureSerDes(taskTopology.sinks(), taskTopology.sources());
673696
streamsMetadataState = new StreamsMetadataState(
674697
internalTopologyBuilder,
675698
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -683,6 +706,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
683706
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
684707
}
685708
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
709+
686710
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
687711
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
688712
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@@ -696,6 +720,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
696720
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
697721
GlobalStreamThread.State globalThreadState = null;
698722
if (globalTaskTopology != null) {
723+
configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
724+
699725
final String globalThreadId = clientId + "-GlobalStreamThread";
700726
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
701727
config,

streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
4444
this.partitioner = partitioner;
4545
}
4646

47+
public Serializer getKeySerializer() {
48+
return keySerializer;
49+
}
50+
51+
public Serializer getValueSerializer() {
52+
return valSerializer;
53+
}
54+
4755
/**
4856
* @throws UnsupportedOperationException if this method adds a child to a sink node
4957
*/

streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ public SourceNode(final String name,
5252
this(name, topics, null, keyDeserializer, valDeserializer);
5353
}
5454

55+
public Deserializer getKeyDeSerializer() {
56+
return keyDeserializer;
57+
}
58+
59+
public Deserializer getValueDeSerializer() {
60+
return valDeserializer;
61+
}
62+
5563
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
5664
return keyDeserializer.deserialize(topic, headers, data);
5765
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,23 @@ public void init(final ProcessorContext context,
115115

116116
@SuppressWarnings("unchecked")
117117
void initStoreSerde(final ProcessorContext context) {
118+
final Serde<K> usedKeySerde;
119+
final Serde<V> usedValueSerde;
120+
final Map<String, Object> conf = context.appConfigs();
121+
if (keySerde == null) {
122+
usedKeySerde = (Serde<K>) context.keySerde();
123+
} else {
124+
usedKeySerde = keySerde;
125+
usedKeySerde.configure(conf, true);
126+
}
127+
if (valueSerde == null) {
128+
usedValueSerde = (Serde<V>) context.valueSerde();
129+
} else {
130+
usedValueSerde = valueSerde;
131+
usedValueSerde.configure(conf, false);
132+
}
118133
serdes = new StateSerdes<>(
119-
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
120-
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
121-
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
134+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
122135
}
123136

124137
@SuppressWarnings("unchecked")

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,23 @@ public class MeteredSessionStore<K, V>
6969
public void init(final ProcessorContext context,
7070
final StateStore root) {
7171
//noinspection unchecked
72+
final Serde<K> usedKeySerde;
73+
final Serde<V> usedValueSerde;
74+
final Map<String, Object> conf = context.appConfigs();
75+
if (keySerde == null) {
76+
usedKeySerde = (Serde<K>) context.keySerde();
77+
} else {
78+
usedKeySerde = keySerde;
79+
usedKeySerde.configure(conf, true);
80+
}
81+
if (valueSerde == null) {
82+
usedValueSerde = (Serde<V>) context.valueSerde();
83+
} else {
84+
usedValueSerde = valueSerde;
85+
usedValueSerde.configure(conf, false);
86+
}
7287
serdes = new StateSerdes<>(
73-
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
74-
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
75-
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
88+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
7689
metrics = (StreamsMetricsImpl) context.metrics();
7790

7891
taskName = context.taskId().toString();

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
2727
import org.apache.kafka.streams.state.ValueAndTimestamp;
2828

29+
import java.util.Map;
30+
2931
/**
3032
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
3133
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
@@ -48,9 +50,22 @@ public class MeteredTimestampedKeyValueStore<K, V>
4850

4951
@SuppressWarnings("unchecked")
5052
void initStoreSerde(final ProcessorContext context) {
53+
final Serde<K> usedKeySerde;
54+
final Serde<ValueAndTimestamp<V>> usedValueSerde;
55+
final Map<String, Object> conf = context.appConfigs();
56+
if (keySerde == null) {
57+
usedKeySerde = (Serde<K>) context.keySerde();
58+
} else {
59+
usedKeySerde = keySerde;
60+
usedKeySerde.configure(conf, true);
61+
}
62+
if (valueSerde == null) {
63+
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
64+
} else {
65+
usedValueSerde = valueSerde;
66+
usedValueSerde.configure(conf, false);
67+
}
5168
serdes = new StateSerdes<>(
52-
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
53-
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
54-
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
69+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
5570
}
5671
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.kafka.streams.state.ValueAndTimestamp;
2727
import org.apache.kafka.streams.state.WindowStore;
2828

29+
import java.util.Map;
30+
2931
/**
3032
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
3133
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
@@ -50,9 +52,22 @@ class MeteredTimestampedWindowStore<K, V>
5052
@SuppressWarnings("unchecked")
5153
@Override
5254
void initStoreSerde(final ProcessorContext context) {
55+
final Serde<K> usedKeySerde;
56+
final Serde<ValueAndTimestamp<V>> usedValueSerde;
57+
final Map<String, Object> conf = context.appConfigs();
58+
if (keySerde == null) {
59+
usedKeySerde = (Serde<K>) context.keySerde();
60+
} else {
61+
usedKeySerde = keySerde;
62+
usedKeySerde.configure(conf, true);
63+
}
64+
if (valueSerde == null) {
65+
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
66+
} else {
67+
usedValueSerde = valueSerde;
68+
usedValueSerde.configure(conf, false);
69+
}
5370
serdes = new StateSerdes<>(
54-
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
55-
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
56-
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
71+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
5772
}
5873
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,23 @@ public void init(final ProcessorContext context,
9999

100100
@SuppressWarnings("unchecked")
101101
void initStoreSerde(final ProcessorContext context) {
102+
final Serde<K> usedKeySerde;
103+
final Serde<V> usedValueSerde;
104+
final Map<String, Object> conf = context.appConfigs();
105+
if (keySerde == null) {
106+
usedKeySerde = (Serde<K>) context.keySerde();
107+
} else {
108+
usedKeySerde = keySerde;
109+
usedKeySerde.configure(conf, true);
110+
}
111+
if (valueSerde == null) {
112+
usedValueSerde = (Serde<V>) context.valueSerde();
113+
} else {
114+
usedValueSerde = valueSerde;
115+
usedValueSerde.configure(conf, false);
116+
}
102117
serdes = new StateSerdes<>(
103-
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
104-
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
105-
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
118+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
106119
}
107120

108121
@SuppressWarnings("unchecked")

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.util.Collections;
4242
import java.util.List;
43+
import java.util.HashMap;
4344
import java.util.Map;
4445

4546
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -92,6 +93,7 @@ public void before() {
9293
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
9394
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
9495
expect(context.taskId()).andReturn(taskId);
96+
expect(context.appConfigs()).andReturn(new HashMap<>());
9597
expect(inner.name()).andReturn("metered").anyTimes();
9698
}
9799

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.runner.RunWith;
4343

4444
import java.util.Collections;
45+
import java.util.HashMap;
4546
import java.util.Map;
4647

4748
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -91,6 +92,7 @@ public void before() {
9192
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
9293
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
9394
expect(context.taskId()).andReturn(taskId);
95+
expect(context.appConfigs()).andReturn(new HashMap<>());
9496
expect(inner.name()).andReturn("metered").anyTimes();
9597
}
9698

streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import java.util.Collections;
4343
import java.util.List;
44+
import java.util.HashMap;
4445
import java.util.Map;
4546

4647
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -94,6 +95,7 @@ public void before() {
9495
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
9596
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
9697
expect(context.taskId()).andReturn(taskId);
98+
expect(context.appConfigs()).andReturn(new HashMap<>());
9799
expect(inner.name()).andReturn("metered").anyTimes();
98100
}
99101

0 commit comments

Comments
 (0)