Skip to content

Commit 409fabc

Browse files
mumrahhachikuji
authored andcommitted
KAFKA-7747; Check for truncation after leader changes [KIP-320] (apache#6371)
After the client detects a leader change we need to check the offset of the current leader for truncation. These changes were part of KIP-320: https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation. Reviewers: Jason Gustafson <[email protected]>
1 parent e56ebbf commit 409fabc

23 files changed

+1574
-240
lines changed

clients/src/main/java/org/apache/kafka/clients/Metadata.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.Cluster;
2020
import org.apache.kafka.common.KafkaException;
21+
import org.apache.kafka.common.Node;
2122
import org.apache.kafka.common.PartitionInfo;
2223
import org.apache.kafka.common.TopicPartition;
2324
import org.apache.kafka.common.errors.AuthenticationException;
@@ -314,6 +315,8 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
314315
if (metadata.isInternal())
315316
internalTopics.add(metadata.topic());
316317
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
318+
319+
// Even if the partition's metadata includes an error, we need to handle the update to catch new epochs
317320
updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> {
318321
int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
319322
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
@@ -358,8 +361,8 @@ private void updatePartitionInfo(String topic,
358361
}
359362
}
360363
} else {
361-
// Old cluster format (no epochs)
362-
lastSeenLeaderEpochs.clear();
364+
// Handle old cluster formats as well as error responses where leader and epoch are missing
365+
lastSeenLeaderEpochs.remove(tp);
363366
partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata));
364367
}
365368
}
@@ -444,4 +447,55 @@ private MetadataRequestAndVersion(MetadataRequest.Builder requestBuilder,
444447
}
445448
}
446449

450+
public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
451+
return partitionInfoIfCurrent(tp)
452+
.map(infoAndEpoch -> {
453+
Node leader = infoAndEpoch.partitionInfo().leader();
454+
return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch()));
455+
})
456+
.orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
457+
}
458+
459+
public static class LeaderAndEpoch {
460+
461+
public static final LeaderAndEpoch NO_LEADER_OR_EPOCH = new LeaderAndEpoch(Node.noNode(), Optional.empty());
462+
463+
public final Node leader;
464+
public final Optional<Integer> epoch;
465+
466+
public LeaderAndEpoch(Node leader, Optional<Integer> epoch) {
467+
this.leader = Objects.requireNonNull(leader);
468+
this.epoch = Objects.requireNonNull(epoch);
469+
}
470+
471+
public static LeaderAndEpoch noLeaderOrEpoch() {
472+
return NO_LEADER_OR_EPOCH;
473+
}
474+
475+
@Override
476+
public boolean equals(Object o) {
477+
if (this == o) return true;
478+
if (o == null || getClass() != o.getClass()) return false;
479+
480+
LeaderAndEpoch that = (LeaderAndEpoch) o;
481+
482+
if (!leader.equals(that.leader)) return false;
483+
return epoch.equals(that.epoch);
484+
}
485+
486+
@Override
487+
public int hashCode() {
488+
int result = leader.hashCode();
489+
result = 31 * result + epoch.hashCode();
490+
return result;
491+
}
492+
493+
@Override
494+
public String toString() {
495+
return "LeaderAndEpoch{" +
496+
"leader=" + leader +
497+
", epoch=" + epoch.map(Number::toString).orElse("absent") +
498+
'}';
499+
}
500+
}
447501
}

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.ApiVersions;
2020
import org.apache.kafka.clients.ClientDnsLookup;
2121
import org.apache.kafka.clients.ClientUtils;
22+
import org.apache.kafka.clients.Metadata;
2223
import org.apache.kafka.clients.NetworkClient;
2324
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
2425
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
@@ -69,6 +70,7 @@
6970
import java.util.Locale;
7071
import java.util.Map;
7172
import java.util.Objects;
73+
import java.util.Optional;
7274
import java.util.Properties;
7375
import java.util.Set;
7476
import java.util.concurrent.TimeUnit;
@@ -1508,7 +1510,20 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
15081510
*/
15091511
@Override
15101512
public void seek(TopicPartition partition, long offset) {
1511-
seek(partition, new OffsetAndMetadata(offset, null));
1513+
if (offset < 0)
1514+
throw new IllegalArgumentException("seek offset must not be a negative number");
1515+
1516+
acquireAndEnsureOpen();
1517+
try {
1518+
log.info("Seeking to offset {} for partition {}", offset, partition);
1519+
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
1520+
offset,
1521+
Optional.empty(), // This will ensure we skip validation
1522+
this.metadata.leaderAndEpoch(partition));
1523+
this.subscriptions.seek(partition, newPosition);
1524+
} finally {
1525+
release();
1526+
}
15121527
}
15131528

15141529
/**
@@ -1535,8 +1550,13 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
15351550
} else {
15361551
log.info("Seeking to offset {} for partition {}", offset, partition);
15371552
}
1553+
Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.leaderAndEpoch(partition);
1554+
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
1555+
offsetAndMetadata.offset(),
1556+
offsetAndMetadata.leaderEpoch(),
1557+
currentLeaderAndEpoch);
15381558
this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
1539-
this.subscriptions.seek(partition, offset);
1559+
this.subscriptions.seekAndValidate(partition, newPosition);
15401560
} finally {
15411561
release();
15421562
}
@@ -1658,9 +1678,9 @@ public long position(TopicPartition partition, final Duration timeout) {
16581678

16591679
Timer timer = time.timer(timeout);
16601680
do {
1661-
Long offset = this.subscriptions.position(partition);
1662-
if (offset != null)
1663-
return offset;
1681+
SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
1682+
if (position != null)
1683+
return position.offset;
16641684

16651685
updateFetchPositions(timer);
16661686
client.poll(timer);
@@ -2196,6 +2216,9 @@ private void close(long timeoutMs, boolean swallowException) {
21962216
* @return true iff the operation completed without timing out
21972217
*/
21982218
private boolean updateFetchPositions(final Timer timer) {
2219+
// If any partitions have been truncated due to a leader change, we need to validate the offsets
2220+
fetcher.validateOffsetsIfNeeded();
2221+
21992222
cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
22002223
if (cachedSubscriptionHashAllFetchPositions) return true;
22012224

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.utils.Utils;
21+
22+
import java.util.Collections;
23+
import java.util.Map;
24+
import java.util.function.Function;
25+
26+
/**
27+
* In the even of unclean leader election, the log will be truncated,
28+
* previously committed data will be lost, and new data will be written
29+
* over these offsets. When this happens, the consumer will detect the
30+
* truncation and raise this exception (if no automatic reset policy
31+
* has been defined) with the first offset to diverge from what the
32+
* consumer read.
33+
*/
34+
public class LogTruncationException extends OffsetOutOfRangeException {
35+
36+
private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
37+
38+
public LogTruncationException(Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
39+
super(Utils.transformMap(divergentOffsets, Function.identity(), OffsetAndMetadata::offset));
40+
this.divergentOffsets = Collections.unmodifiableMap(divergentOffsets);
41+
}
42+
43+
/**
44+
* Get the offsets for the partitions which were truncated. This is the first offset which is known to diverge
45+
* from what the consumer read.
46+
*/
47+
public Map<TopicPartition, OffsetAndMetadata> divergentOffsets() {
48+
return divergentOffsets;
49+
}
50+
}

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19+
import org.apache.kafka.clients.Metadata;
1920
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
2021
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
2122
import org.apache.kafka.common.KafkaException;
2223
import org.apache.kafka.common.Metric;
2324
import org.apache.kafka.common.MetricName;
25+
import org.apache.kafka.common.Node;
2426
import org.apache.kafka.common.PartitionInfo;
2527
import org.apache.kafka.common.TopicPartition;
2628
import org.apache.kafka.common.errors.WakeupException;
@@ -189,13 +191,17 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
189191
if (!subscriptions.isPaused(entry.getKey())) {
190192
final List<ConsumerRecord<K, V>> recs = entry.getValue();
191193
for (final ConsumerRecord<K, V> rec : recs) {
192-
if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
193-
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
194+
long position = subscriptions.position(entry.getKey()).offset;
195+
196+
if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > position) {
197+
throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
194198
}
195199

196-
if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
200+
if (assignment().contains(entry.getKey()) && rec.offset() >= position) {
197201
results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
198-
subscriptions.position(entry.getKey(), rec.offset() + 1);
202+
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
203+
rec.offset() + 1, rec.leaderEpoch(), new Metadata.LeaderAndEpoch(Node.noNode(), rec.leaderEpoch()));
204+
subscriptions.position(entry.getKey(), newPosition);
199205
}
200206
}
201207
}
@@ -290,12 +296,12 @@ public synchronized long position(TopicPartition partition) {
290296
ensureNotClosed();
291297
if (!this.subscriptions.isAssigned(partition))
292298
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
293-
Long offset = this.subscriptions.position(partition);
294-
if (offset == null) {
299+
SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
300+
if (position == null) {
295301
updateFetchPosition(partition);
296-
offset = this.subscriptions.position(partition);
302+
position = this.subscriptions.position(partition);
297303
}
298-
return offset;
304+
return position.offset;
299305
}
300306

301307
@Override
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import org.apache.kafka.clients.ClientResponse;
20+
import org.apache.kafka.common.Node;
21+
import org.apache.kafka.common.requests.AbstractRequest;
22+
import org.apache.kafka.common.requests.AbstractResponse;
23+
import org.apache.kafka.common.utils.LogContext;
24+
import org.slf4j.Logger;
25+
26+
public abstract class AsyncClient<T1, Req extends AbstractRequest, Resp extends AbstractResponse, T2> {
27+
28+
private final Logger log;
29+
private final ConsumerNetworkClient client;
30+
31+
AsyncClient(ConsumerNetworkClient client, LogContext logContext) {
32+
this.client = client;
33+
this.log = logContext.logger(getClass());
34+
}
35+
36+
public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
37+
AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node, requestData);
38+
39+
return client.send(node, requestBuilder).compose(new RequestFutureAdapter<ClientResponse, T2>() {
40+
@Override
41+
@SuppressWarnings("unchecked")
42+
public void onSuccess(ClientResponse value, RequestFuture<T2> future) {
43+
Resp resp;
44+
try {
45+
resp = (Resp) value.responseBody();
46+
} catch (ClassCastException cce) {
47+
log.error("Could not cast response body", cce);
48+
future.raise(cce);
49+
return;
50+
}
51+
log.trace("Received {} {} from broker {}", resp.getClass().getSimpleName(), resp, node);
52+
try {
53+
future.complete(handleResponse(node, requestData, resp));
54+
} catch (RuntimeException e) {
55+
if (!future.isDone()) {
56+
future.raise(e);
57+
}
58+
}
59+
}
60+
61+
@Override
62+
public void onFailure(RuntimeException e, RequestFuture<T2> future1) {
63+
future1.raise(e);
64+
}
65+
});
66+
}
67+
68+
protected Logger logger() {
69+
return log;
70+
}
71+
72+
protected abstract AbstractRequest.Builder<Req> prepareRequest(Node node, T1 requestData);
73+
74+
protected abstract T2 handleResponse(Node node, T1 requestData, Resp response);
75+
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.List;
6161
import java.util.Map;
6262
import java.util.Objects;
63+
import java.util.Optional;
6364
import java.util.Set;
6465
import java.util.concurrent.ConcurrentLinkedQueue;
6566
import java.util.concurrent.atomic.AtomicInteger;
@@ -507,10 +508,15 @@ public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
507508

508509
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
509510
final TopicPartition tp = entry.getKey();
510-
final long offset = entry.getValue().offset();
511-
log.info("Setting offset for partition {} to the committed offset {}", tp, offset);
511+
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
512+
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
513+
final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
514+
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
515+
new ConsumerMetadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.empty()));
516+
517+
log.info("Setting offset for partition {} to the committed offset {}", tp, position);
512518
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
513-
this.subscriptions.seek(tp, offset);
519+
this.subscriptions.seekAndValidate(tp, position);
514520
}
515521
return true;
516522
}

0 commit comments

Comments
 (0)