Skip to content

Commit ae8b784

Browse files
author
Boyang Chen
authored
KAFKA-12499: add transaction timeout verification (#10482)
This PR tries to add the check for transaction timeout for a comparison against commit interval of streams. If transaction timeout is smaller than commit interval, stream should crash and inform user to update their commit interval to be larger or equal to the given transaction timeout, or vise versa. Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>, A. Sophie Blee-Goldman <[email protected]>
1 parent 72d1082 commit ae8b784

File tree

65 files changed

+138
-80
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+138
-80
lines changed

streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public static void main(final String[] args) {
180180
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSONSerde.class);
181181
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSONSerde.class);
182182
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
183-
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
183+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
184184

185185
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
186186
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
5959
import static org.apache.kafka.common.config.ConfigDef.Range.between;
6060
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
61+
import static org.apache.kafka.common.config.ConfigDef.parseType;
6162

6263
/**
6364
* Configuration for a {@link KafkaStreams} instance.
@@ -144,6 +145,7 @@ public class StreamsConfig extends AbstractConfig {
144145
private final boolean eosEnabled;
145146
private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
146147
private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
148+
private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000;
147149

148150
public static final int DUMMY_THREAD_INDEX = 1;
149151
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
@@ -907,7 +909,7 @@ public class StreamsConfig extends AbstractConfig {
907909
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
908910
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
909911
// Reduce the transaction timeout for quicker pending offset expiration on broker side.
910-
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);
912+
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT);
911913

912914
PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
913915
}
@@ -1078,6 +1080,26 @@ protected StreamsConfig(final Map<?, ?> props,
10781080
if (props.containsKey(RETRIES_CONFIG)) {
10791081
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG);
10801082
}
1083+
1084+
if (eosEnabled) {
1085+
verifyEOSTransactionTimeoutCompatibility();
1086+
}
1087+
}
1088+
1089+
private void verifyEOSTransactionTimeoutCompatibility() {
1090+
final long commitInterval = getLong(COMMIT_INTERVAL_MS_CONFIG);
1091+
final String transactionTimeoutConfigKey = producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
1092+
final int transactionTimeout = originals().containsKey(transactionTimeoutConfigKey) ? (int) parseType(
1093+
transactionTimeoutConfigKey, originals().get(transactionTimeoutConfigKey), Type.INT) : DEFAULT_TRANSACTION_TIMEOUT;
1094+
1095+
if (transactionTimeout < commitInterval) {
1096+
throw new IllegalArgumentException(String.format("Transaction timeout %d was set lower than " +
1097+
"streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity " +
1098+
"caused by long commit interval. Consider reconfiguring commit interval to match " +
1099+
"transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match " +
1100+
"commit interval by tuning `producer.transaction.timeout.ms` config.",
1101+
transactionTimeout, commitInterval));
1102+
}
10811103
}
10821104

10831105
@Override

streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
4949
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
50+
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
5051
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
5152
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
5253
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
@@ -398,6 +399,32 @@ public void shouldOverrideStreamsDefaultProducerConfigs() {
398399
assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
399400
}
400401

402+
@SuppressWarnings("deprecation")
403+
@Test
404+
public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() {
405+
assertThrows(IllegalArgumentException.class,
406+
() -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE));
407+
}
408+
409+
@SuppressWarnings("deprecation")
410+
@Test
411+
public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() {
412+
assertThrows(IllegalArgumentException.class,
413+
() -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_BETA));
414+
}
415+
416+
@Test
417+
public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() {
418+
testTransactionTimeoutSmallerThanCommitInterval(AT_LEAST_ONCE);
419+
}
420+
421+
private void testTransactionTimeoutSmallerThanCommitInterval(final String processingGuarantee) {
422+
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
423+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000L);
424+
props.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 3000);
425+
new StreamsConfig(props);
426+
}
427+
401428
@Test
402429
public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
403430
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");

streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private void prepareConfigs(final String appID) {
148148
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
149149
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
150150
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
151-
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
151+
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
152152
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
153153
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
154154
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
9090
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
9191
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
9292
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
93-
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000),
93+
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
9494
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
9595
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
9696
)

streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ private void runSimpleCopyTest(final int numberOfRestarts,
236236
final Properties properties = new Properties();
237237
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
238238
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
239-
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
239+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
240240
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
241241
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
242242
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
@@ -325,7 +325,7 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
325325
final Properties properties = new Properties();
326326
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
327327
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
328-
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
328+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
329329
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
330330
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
331331

@@ -874,9 +874,14 @@ public void close() { }
874874
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
875875

876876
final Properties properties = new Properties();
877+
// Set commit interval to a larger value to avoid affection of controlled stream commit,
878+
// but not too large as we need to have a relatively low transaction timeout such
879+
// that it should help trigger the timed out transaction in time.
880+
final long commitIntervalMs = 20_000L;
877881
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
878882
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
879-
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
883+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);
884+
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitIntervalMs);
880885
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
881886
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
882887
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);

streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -936,13 +936,14 @@ public void close() {}
936936
final Properties properties = new Properties();
937937
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
938938
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
939-
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
939+
final long commitInterval = Duration.ofMinutes(1L).toMillis();
940+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
940941
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
941942
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
942943
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
943944
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
944945
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
945-
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(1L).toMillis());
946+
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
946947
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
947948
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
948949
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);

streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void setUp() throws IOException {
145145

146146
final Properties props = new Properties();
147147
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
148-
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
148+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
149149
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
150150
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
151151

@@ -284,7 +284,7 @@ public void shouldThrowExceptionOverlappingTopic() {
284284
public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
285285
final Properties props = new Properties();
286286
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
287-
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
287+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
288288
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
289289
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
290290

streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void before() throws Exception {
126126
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
127127
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
128128
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
129-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
129+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
130130
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
131131
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
132132
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void before() throws Exception {
110110
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
111111
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
112112
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
113-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
113+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
114114
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
115115
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
116116
.withKeySerde(Serdes.Long())

streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void before() throws Exception {
119119
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
120120
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
121121
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
122-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
122+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
123123

124124
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
125125

streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ private static Properties streamsProperties(final String appId,
297297
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "2"),
298298
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"),
299299
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
300-
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
300+
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L),
301301
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()),
302302
// Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
303303
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)

streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void before() {
102102
streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
103103
streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
104104
streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
105-
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
105+
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
106106
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
107107
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
108108
}

streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void before() throws InterruptedException {
143143
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
144144
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
145145
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
146-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
146+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
147147
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
148148
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
149149

streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void before() throws InterruptedException {
138138
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
139139
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
140140
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
141-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
141+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
142142
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
143143
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
144144
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);

streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private static Properties getStreamsConfig() {
210210
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
211211
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
212212
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
213-
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
213+
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
214214
// increase the session timeout value, to avoid unnecessary rebalance
215215
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
216216
return streamsConfig;

streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static void startCluster() throws IOException {
7070
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
7171
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
7272
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
73-
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
73+
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5L);
7474
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
7575
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
7676
STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300);

streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void before() {
116116
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
117117
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
118118
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
119-
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
119+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
120120

121121
consumerConfiguration = new Properties();
122122
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private Properties streamsConfiguration() {
197197
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
198198
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
199199
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
200-
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
200+
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
201201
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
202202
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
203203
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

0 commit comments

Comments
 (0)