Skip to content

Commit f2785f3

Browse files
authored
KAFKA-12754: Improve endOffsets for TaskMetadata (#10634)
Improve endOffsets for TaskMetadata by updating immediately after polling a new batch Reviewers: Anna Sophie Blee-Goldman <[email protected]>
1 parent db3e5e2 commit f2785f3

File tree

9 files changed

+254
-35
lines changed

9 files changed

+254
-35
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java

+9
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,23 @@ public Set<TopicPartition> topicPartitions() {
6060
return topicPartitions;
6161
}
6262

63+
/**
64+
* This function will return a map of TopicPartitions and the highest committed offset seen so far
65+
*/
6366
public Map<TopicPartition, Long> committedOffsets() {
6467
return committedOffsets;
6568
}
6669

70+
/**
71+
* This function will return a map of TopicPartitions and the highest offset seen so far in the Topic
72+
*/
6773
public Map<TopicPartition, Long> endOffsets() {
6874
return endOffsets;
6975
}
7076

77+
/**
78+
* This function will return the time task idling started, if the task is not currently idling it will return empty
79+
*/
7180
public Optional<Long> timeCurrentIdlingStarted() {
7281
return timeCurrentIdlingStarted;
7382
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

-5
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,6 @@ public Optional<Long> timeCurrentIdlingStarted() {
297297
return Optional.empty();
298298
}
299299

300-
@Override
301-
public void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset) {
302-
303-
}
304-
305300
@Override
306301
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
307302
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,13 @@ public StreamTask(final TaskId id,
196196
);
197197

198198
stateMgr.registerGlobalStateStores(topology.globalStateStores());
199-
this.committedOffsets = new HashMap<>();
200-
this.highWatermark = new HashMap<>();
201-
this.timeCurrentIdlingStarted = Optional.empty();
199+
committedOffsets = new HashMap<>();
200+
highWatermark = new HashMap<>();
201+
for (final TopicPartition topicPartition: inputPartitions) {
202+
committedOffsets.put(topicPartition, -1L);
203+
highWatermark.put(topicPartition, -1L);
204+
}
205+
timeCurrentIdlingStarted = Optional.empty();
202206
}
203207

204208
// create queues for each assigned partition and associate them
@@ -1173,7 +1177,6 @@ public Map<TopicPartition, Long> committedOffsets() {
11731177

11741178
@Override
11751179
public Map<TopicPartition, Long> highWaterMark() {
1176-
highWatermark.putAll(recordCollector.offsets());
11771180
return Collections.unmodifiableMap(highWatermark);
11781181
}
11791182

@@ -1188,11 +1191,14 @@ public Optional<Long> timeCurrentIdlingStarted() {
11881191
return timeCurrentIdlingStarted;
11891192
}
11901193

1191-
@Override
11921194
public void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset) {
11931195
committedOffsets.put(topicPartition, offset);
11941196
}
11951197

1198+
public void updateEndOffsets(final TopicPartition topicPartition, final Long offset) {
1199+
highWatermark.put(topicPartition, offset);
1200+
}
1201+
11961202
public boolean hasRecordsQueued() {
11971203
return numBuffered() > 0;
11981204
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.clients.consumer.Consumer;
2121
import org.apache.kafka.clients.consumer.ConsumerConfig;
2222
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
2425
import org.apache.kafka.clients.consumer.InvalidOffsetException;
2526
import org.apache.kafka.common.KafkaException;
@@ -52,6 +53,7 @@
5253
import java.time.Duration;
5354
import java.util.Arrays;
5455
import java.util.Collections;
56+
import java.util.Comparator;
5557
import java.util.HashSet;
5658
import java.util.List;
5759
import java.util.Map;
@@ -900,6 +902,14 @@ private long pollPhase() {
900902

901903
final int numRecords = records.count();
902904

905+
for (final TopicPartition topicPartition: records.partitions()) {
906+
records
907+
.records(topicPartition)
908+
.stream()
909+
.max(Comparator.comparing(ConsumerRecord::offset))
910+
.ifPresent(t -> taskManager.updateTaskEndMetadata(topicPartition, t.offset()));
911+
}
912+
903913
log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
904914

905915
pollSensor.record(pollLatency, now);

streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

-7
Original file line numberDiff line numberDiff line change
@@ -248,11 +248,4 @@ default boolean commitRequested() {
248248
* @return This returns the time the task started idling. If it is not idling it returns empty.
249249
*/
250250
Optional<Long> timeCurrentIdlingStarted();
251-
252-
/**
253-
* Update the committed offsets in the Task
254-
* @param topicPartition
255-
* @param offset
256-
*/
257-
void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset);
258251
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

+19-7
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
10911091
try {
10921092
tasks.streamsProducerForTask(task.id())
10931093
.commitTransaction(taskToCommit.getValue(), mainConsumer.groupMetadata());
1094-
updateTaskMetadata(taskToCommit.getValue());
1094+
updateTaskCommitMetadata(taskToCommit.getValue());
10951095
} catch (final TimeoutException timeoutException) {
10961096
log.error(
10971097
String.format("Committing task %s failed.", task.id()),
@@ -1107,7 +1107,7 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
11071107
if (processingMode == EXACTLY_ONCE_V2) {
11081108
try {
11091109
tasks.threadProducer().commitTransaction(allOffsets, mainConsumer.groupMetadata());
1110-
updateTaskMetadata(allOffsets);
1110+
updateTaskCommitMetadata(allOffsets);
11111111
} catch (final TimeoutException timeoutException) {
11121112
log.error(
11131113
String.format("Committing task(s) %s failed.",
@@ -1125,7 +1125,7 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
11251125
} else {
11261126
try {
11271127
mainConsumer.commitSync(allOffsets);
1128-
updateTaskMetadata(allOffsets);
1128+
updateTaskCommitMetadata(allOffsets);
11291129
} catch (final CommitFailedException error) {
11301130
throw new TaskMigratedException("Consumer committing offsets failed, " +
11311131
"indicating the corresponding thread is no longer part of the group", error);
@@ -1152,11 +1152,23 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
11521152
}
11531153
}
11541154

1155-
private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
1155+
private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
11561156
for (final Task task: tasks.activeTasks()) {
1157-
for (final TopicPartition topicPartition: task.inputPartitions()) {
1158-
if (allOffsets.containsKey(topicPartition)) {
1159-
task.updateCommittedOffsets(topicPartition, allOffsets.get(topicPartition).offset());
1157+
if (task instanceof StreamTask) {
1158+
for (final TopicPartition topicPartition : task.inputPartitions()) {
1159+
if (allOffsets.containsKey(topicPartition)) {
1160+
((StreamTask) task).updateCommittedOffsets(topicPartition, allOffsets.get(topicPartition).offset());
1161+
}
1162+
}
1163+
}
1164+
}
1165+
}
1166+
1167+
public void updateTaskEndMetadata(final TopicPartition topicPartition, final Long offset) {
1168+
for (final Task task: tasks.activeTasks()) {
1169+
if (task instanceof StreamTask) {
1170+
if (task.inputPartitions().contains(topicPartition)) {
1171+
((StreamTask) task).updateEndOffsets(topicPartition, offset);
11601172
}
11611173
}
11621174
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.streams.integration;
19+
20+
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.serialization.Serdes;
22+
import org.apache.kafka.common.serialization.StringSerializer;
23+
import org.apache.kafka.streams.KafkaStreams;
24+
import org.apache.kafka.streams.KeyValue;
25+
import org.apache.kafka.streams.StreamsBuilder;
26+
import org.apache.kafka.streams.StreamsConfig;
27+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
28+
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
29+
import org.apache.kafka.streams.kstream.KStream;
30+
import org.apache.kafka.streams.processor.AbstractProcessor;
31+
import org.apache.kafka.streams.processor.TaskMetadata;
32+
import org.apache.kafka.test.IntegrationTest;
33+
import org.apache.kafka.test.TestUtils;
34+
import org.junit.After;
35+
import org.junit.AfterClass;
36+
import org.junit.Before;
37+
import org.junit.BeforeClass;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
import org.junit.rules.TestName;
42+
43+
import java.io.IOException;
44+
import java.time.Duration;
45+
import java.util.Collections;
46+
import java.util.List;
47+
import java.util.Properties;
48+
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.stream.Collectors;
50+
51+
import static org.apache.kafka.common.utils.Utils.mkEntry;
52+
import static org.apache.kafka.common.utils.Utils.mkMap;
53+
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
54+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
55+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
56+
import static org.hamcrest.CoreMatchers.equalTo;
57+
import static org.hamcrest.MatcherAssert.assertThat;
58+
59+
@Category(IntegrationTest.class)
60+
public class TaskMetadataIntegrationTest {
61+
62+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
63+
64+
@BeforeClass
65+
public static void startCluster() throws IOException {
66+
CLUSTER.start();
67+
}
68+
69+
@AfterClass
70+
public static void closeCluster() {
71+
CLUSTER.stop();
72+
}
73+
public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
74+
75+
@Rule
76+
public TestName testName = new TestName();
77+
78+
private String inputTopic;
79+
private static StreamsBuilder builder;
80+
private static Properties properties;
81+
private static String appId = "TaskMetadataTest_";
82+
private AtomicBoolean process;
83+
private AtomicBoolean commit;
84+
85+
@Before
86+
public void setup() {
87+
final String testId = safeUniqueTestName(getClass(), testName);
88+
appId = appId + testId;
89+
inputTopic = "input" + testId;
90+
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
91+
92+
builder = new StreamsBuilder();
93+
94+
process = new AtomicBoolean(true);
95+
commit = new AtomicBoolean(true);
96+
97+
final KStream<String, String> stream = builder.stream(inputTopic);
98+
stream.process(PauseProcessor::new);
99+
100+
properties = mkObjectProperties(
101+
mkMap(
102+
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
103+
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
104+
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
105+
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
106+
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
107+
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
108+
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1L)
109+
)
110+
);
111+
}
112+
113+
@Test
114+
public void shouldReportCorrectCommittedOffsetInformation() {
115+
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
116+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), DEFAULT_DURATION);
117+
final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
118+
assertThat(taskMetadata.committedOffsets().size(), equalTo(1));
119+
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
120+
121+
produceMessages(0L, inputTopic, "test");
122+
TestUtils.waitForCondition(() -> !process.get(), "The record was not processed");
123+
TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 1L, "the record was processed");
124+
process.set(true);
125+
126+
produceMessages(0L, inputTopic, "test1");
127+
TestUtils.waitForCondition(() -> !process.get(), "The record was not processed");
128+
TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 2L, "the record was processed");
129+
process.set(true);
130+
131+
produceMessages(0L, inputTopic, "test1");
132+
TestUtils.waitForCondition(() -> !process.get(), "The record was not processed");
133+
TestUtils.waitForCondition(() -> taskMetadata.committedOffsets().get(topicPartition) == 3L, "the record was processed");
134+
} catch (final Exception e) {
135+
e.printStackTrace();
136+
}
137+
}
138+
139+
@Test
140+
public void shouldReportCorrectEndOffsetInformation() {
141+
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
142+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), DEFAULT_DURATION);
143+
final TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
144+
assertThat(taskMetadata.endOffsets().size(), equalTo(1));
145+
final TopicPartition topicPartition = new TopicPartition(inputTopic, 0);
146+
commit.set(false);
147+
148+
for (int i = 0; i < 10; i++) {
149+
produceMessages(0L, inputTopic, "test");
150+
TestUtils.waitForCondition(() -> !process.get(), "The record was not processed");
151+
process.set(true);
152+
}
153+
assertThat(taskMetadata.endOffsets().get(topicPartition), equalTo(9L));
154+
155+
} catch (final Exception e) {
156+
e.printStackTrace();
157+
}
158+
}
159+
160+
private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
161+
final List<TaskMetadata> taskMetadataList = kafkaStreams.localThreadsMetadata().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
162+
assertThat("only one task", taskMetadataList.size() == 1);
163+
return taskMetadataList.get(0);
164+
}
165+
166+
@After
167+
public void teardown() throws IOException {
168+
purgeLocalStreamsState(properties);
169+
}
170+
171+
private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
172+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
173+
streamOneInput,
174+
Collections.singletonList(new KeyValue<>("1", msg)),
175+
TestUtils.producerConfig(
176+
CLUSTER.bootstrapServers(),
177+
StringSerializer.class,
178+
StringSerializer.class,
179+
new Properties()),
180+
timestamp);
181+
}
182+
183+
private class PauseProcessor extends AbstractProcessor<String, String> {
184+
@Override
185+
public void process(final String key, final String value) {
186+
while (!process.get()) {
187+
try {
188+
wait(100);
189+
} catch (final InterruptedException e) {
190+
191+
}
192+
}
193+
context().forward(key, value);
194+
if (commit.get()) {
195+
context().commit();
196+
}
197+
process.set(false);
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)