Skip to content

Commit b937ec7

Browse files
author
Boyang Chen
authored
KAFKA-9911: Add new PRODUCER_FENCED error code (#8549)
Add a separate error code as PRODUCER_FENCED to differentiate INVALID_PRODUCER_EPOCH. On broker side, replace INVALID_PRODUCER_EPOCH with PRODUCER_FENCED when the request version is the latest, while still returning INVALID_PRODUCER_EPOCH to older clients. On client side, simply handling INVALID_PRODUCER_EPOCH the same as PRODUCER_FENCED if from txn coordinator APIs. Reviewers: Guozhang Wang <[email protected]>
1 parent 89e12f3 commit b937ec7

23 files changed

+571
-107
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

+23-13
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
379379

380380
EndTxnHandler handler = new EndTxnHandler(builder);
381381
enqueueRequest(handler);
382-
if (!shouldBumpEpoch()) {
382+
if (!epochBumpRequired) {
383383
return handler.result;
384384
}
385385
}
@@ -553,10 +553,6 @@ synchronized void requestEpochBumpForPartition(TopicPartition tp) {
553553
this.partitionsToRewriteSequences.add(tp);
554554
}
555555

556-
private boolean shouldBumpEpoch() {
557-
return epochBumpRequired;
558-
}
559-
560556
private void bumpIdempotentProducerEpoch() {
561557
if (this.producerIdAndEpoch.epoch == Short.MAX_VALUE) {
562558
resetIdempotentProducerId();
@@ -577,7 +573,7 @@ private void bumpIdempotentProducerEpoch() {
577573

578574
synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
579575
if (!isTransactional()) {
580-
if (shouldBumpEpoch()) {
576+
if (epochBumpRequired) {
581577
bumpIdempotentProducerEpoch();
582578
}
583579
if (currentState != State.INITIALIZING && !hasProducerId()) {
@@ -1014,10 +1010,14 @@ synchronized boolean canRetry(ProduceResponse.PartitionResponse response, Produc
10141010

10151011
if (!isTransactional()) {
10161012
// For the idempotent producer, always retry UNKNOWN_PRODUCER_ID errors. If the batch has the current
1017-
// producer ID and epoch, request a bump of the epoch. Otherwise just retry, as the
1013+
// producer ID and epoch, request a bump of the epoch. Otherwise just retry the produce.
10181014
requestEpochBumpForPartition(batch.topicPartition);
10191015
return true;
10201016
}
1017+
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
1018+
// Retry the initProducerId to bump the epoch and continue.
1019+
requestEpochBumpForPartition(batch.topicPartition);
1020+
return true;
10211021
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
10221022
if (!hasUnresolvedSequence(batch.topicPartition) &&
10231023
(batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
@@ -1366,6 +1366,10 @@ public void handleResponse(AbstractResponse response) {
13661366
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
13671367
error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
13681368
fatalError(error.exception());
1369+
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
1370+
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
1371+
// just treat it the same as PRODUCE_FENCED.
1372+
fatalError(Errors.PRODUCER_FENCED.exception());
13691373
} else {
13701374
fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
13711375
}
@@ -1417,8 +1421,10 @@ public void handleResponse(AbstractResponse response) {
14171421
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
14181422
reenqueue();
14191423
return;
1420-
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
1421-
fatalError(error.exception());
1424+
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
1425+
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
1426+
// just treat it the same as PRODUCE_FENCED.
1427+
fatalError(Errors.PRODUCER_FENCED.exception());
14221428
return;
14231429
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
14241430
fatalError(error.exception());
@@ -1575,8 +1581,10 @@ public void handleResponse(AbstractResponse response) {
15751581
reenqueue();
15761582
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
15771583
reenqueue();
1578-
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
1579-
fatalError(error.exception());
1584+
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
1585+
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
1586+
// just treat it the same as PRODUCE_FENCED.
1587+
fatalError(Errors.PRODUCER_FENCED.exception());
15801588
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
15811589
fatalError(error.exception());
15821590
} else if (error == Errors.INVALID_TXN_STATE) {
@@ -1632,8 +1640,10 @@ public void handleResponse(AbstractResponse response) {
16321640
reenqueue();
16331641
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
16341642
abortableErrorIfPossible(error.exception());
1635-
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
1636-
fatalError(error.exception());
1643+
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
1644+
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
1645+
// just treat it the same as PRODUCE_FENCED.
1646+
fatalError(Errors.PRODUCER_FENCED.exception());
16371647
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
16381648
fatalError(error.exception());
16391649
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.kafka.common.errors;
1818

1919
/**
20-
* A retryable exception is a transient exception that if retried may succeed.
20+
* A retriable exception is a transient exception that if retried may succeed.
2121
*/
2222
public abstract class RetriableException extends ApiException {
2323

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.internals;
18+
19+
import org.apache.kafka.common.errors.RetriableException;
20+
21+
/**
22+
* This exception indicates that the produce request sent to the partition leader
23+
* contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
24+
* will be aborted and can be retried.
25+
*/
26+
public class InvalidProducerEpochException extends RetriableException {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
public InvalidProducerEpochException(String message) {
31+
super(message);
32+
}
33+
}

clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
3434
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
3535
import org.apache.kafka.common.errors.FencedLeaderEpochException;
36+
import org.apache.kafka.common.internals.InvalidProducerEpochException;
3637
import org.apache.kafka.common.errors.ListenerNotFoundException;
3738
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
3839
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -228,9 +229,8 @@ public enum Errors {
228229
OutOfOrderSequenceException::new),
229230
DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number.",
230231
DuplicateSequenceException::new),
231-
INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either there is a newer producer " +
232-
"with the same transactionalId, or the producer's transaction has been expired by the broker.",
233-
ProducerFencedException::new),
232+
INVALID_PRODUCER_EPOCH(47, "Producer attempted to produce with an old epoch.",
233+
InvalidProducerEpochException::new),
234234
INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state.",
235235
InvalidTxnStateException::new),
236236
INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer id which is not currently assigned to " +
@@ -320,10 +320,12 @@ public enum Errors {
320320
NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
321321
NoReassignmentInProgressException::new),
322322
GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
323-
GroupSubscribedToTopicException::new),
323+
GroupSubscribedToTopicException::new),
324324
INVALID_RECORD(87, "This record has failed the validation on broker and hence will be rejected.", InvalidRecordException::new),
325325
UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new),
326-
THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new);
326+
THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new),
327+
PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " +
328+
"which fences the current one.", ProducerFencedException::new);
327329

328330
private static final Logger log = LoggerFactory.getLogger(Errors.class);
329331

clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
3232
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
3333
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
34-
* - {@link Errors#INVALID_PRODUCER_EPOCH}
34+
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
35+
* - {@link Errors#PRODUCER_FENCED}
3536
* - {@link Errors#INVALID_TXN_STATE}
3637
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
3738
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}

clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
3939
* - {@link Errors#INVALID_TXN_STATE}
4040
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
41-
* - {@link Errors#INVALID_PRODUCER_EPOCH}
41+
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
42+
* - {@link Errors#PRODUCER_FENCED}
4243
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
4344
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
4445
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}

clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
3333
* - {@link Errors#INVALID_TXN_STATE}
3434
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
35-
* - {@link Errors#INVALID_PRODUCER_EPOCH}
35+
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
36+
* - {@link Errors#PRODUCER_FENCED}
3637
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
3738
*/
3839
public class EndTxnResponse extends AbstractResponse {

clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626

2727
/**
2828
* Possible error codes:
29-
* - {@link Errors#NOT_COORDINATOR}
30-
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
31-
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
32-
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
33-
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
29+
*
30+
* - {@link Errors#NOT_COORDINATOR}
31+
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
32+
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
33+
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
34+
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
35+
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=3
36+
* - {@link Errors#PRODUCER_FENCED}
3437
*/
3538
public class InitProducerIdResponse extends AbstractResponse {
3639
public final InitProducerIdResponseData data;

clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "request",
1919
"name": "AddOffsetsToTxnRequest",
2020
// Version 1 is the same as version 0.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "response",
1919
"name": "AddOffsetsToTxnResponse",
2020
// Starting in version 1, on quota violation brokers send out responses before throttling.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "request",
1919
"name": "AddPartitionsToTxnRequest",
2020
// Version 1 is the same as version 0.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "response",
1919
"name": "AddPartitionsToTxnResponse",
2020
// Starting in version 1, on quota violation brokers send out responses before throttling.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

clients/src/main/resources/common/message/EndTxnRequest.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "request",
1919
"name": "EndTxnRequest",
2020
// Version 1 is the same as version 0.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

clients/src/main/resources/common/message/EndTxnResponse.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "response",
1919
"name": "EndTxnResponse",
2020
// Starting in version 1, on quota violation, brokers send out responses before throttling.
21-
"validVersions": "0-1",
21+
//
22+
// Version 2 adds the support for new error code PRODUCER_FENCED.
23+
"validVersions": "0-2",
2224
"flexibleVersions": "none",
2325
"fields": [
2426
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

clients/src/main/resources/common/message/InitProducerIdRequest.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
// Version 2 is the first flexible version.
2323
//
2424
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
25-
"validVersions": "0-3",
25+
//
26+
// Version 4 adds the support for new error code PRODUCER_FENCED.
27+
"validVersions": "0-4",
2628
"flexibleVersions": "2+",
2729
"fields": [
2830
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",

clients/src/main/resources/common/message/InitProducerIdResponse.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
// Version 2 is the first flexible version.
2323
//
2424
// Version 3 is the same as version 2.
25-
"validVersions": "0-3",
25+
//
26+
// Version 4 adds the support for new error code PRODUCER_FENCED.
27+
"validVersions": "0-4",
2628
"flexibleVersions": "2+",
2729
"fields": [
2830
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,

0 commit comments

Comments
 (0)