Skip to content

Commit 3a3159b

Browse files
authored
KAFKA-18953: [1/N] Add broker side handling for 2 PC (KIP-939) (apache#19193)
This patch adds logic to enable and handle two phase commit (2PC) transactions following KIP-939. The changes made are as follows: 1) Add a new broker config called **transaction.two.phase.commit.enable** which is set to false by default 2) Add new flags **enableTwoPCFlag** and **keepPreparedTxn** to handleInitProducerId 3) Return an error if keepPreparedTxn is set to true (for now) Reviewers: Artem Livshits <[email protected]>, Justine Olshan <[email protected]>
1 parent a5325e0 commit 3a3159b

File tree

11 files changed

+338
-85
lines changed

11 files changed

+338
-85
lines changed

clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,11 @@ public InitProducerIdRequestData data() {
7777
return data;
7878
}
7979

80+
public boolean enable2Pc() {
81+
return data.enable2Pc();
82+
}
83+
84+
public boolean keepPreparedTxn() {
85+
return data.keepPreparedTxn();
86+
}
8087
}

core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ object TransactionCoordinator {
5555
config.transactionLogConfig.transactionTopicMinISR,
5656
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
5757
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
58+
config.transactionStateManagerConfig.transaction2PCEnabled,
5859
config.requestTimeoutMs)
5960

6061
val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,
@@ -109,6 +110,8 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
109110

110111
def handleInitProducerId(transactionalId: String,
111112
transactionTimeoutMs: Int,
113+
enableTwoPCFlag: Boolean,
114+
keepPreparedTxn: Boolean,
112115
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
113116
responseCallback: InitProducerIdCallback,
114117
requestLocal: RequestLocal = RequestLocal.noCaching): Unit = {
@@ -125,10 +128,20 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
125128
// if transactional id is empty then return error as invalid request. This is
126129
// to make TransactionCoordinator's behavior consistent with producer client
127130
responseCallback(initTransactionError(Errors.INVALID_REQUEST))
128-
} else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
131+
} else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) {
132+
// if the request is to enable two-phase commit but the broker 2PC config is set to false,
133+
// 2PC functionality is disabled, clients that attempt to use this functionality
134+
// would receive an authorization failed error.
135+
responseCallback(initTransactionError(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
136+
} else if (keepPreparedTxn) {
137+
// if the request is to keep the prepared transaction, then return an
138+
// unsupported version error since the feature hasn't been implemented yet.
139+
responseCallback(initTransactionError(Errors.UNSUPPORTED_VERSION))
140+
} else if (!txnManager.validateTransactionTimeoutMs(enableTwoPCFlag, transactionTimeoutMs)) {
129141
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
130142
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
131143
} else {
144+
val resolvedTxnTimeoutMs = if (enableTwoPCFlag) Int.MaxValue else transactionTimeoutMs
132145
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
133146
case None =>
134147
try {
@@ -138,7 +151,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
138151
nextProducerId = RecordBatch.NO_PRODUCER_ID,
139152
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
140153
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
141-
txnTimeoutMs = transactionTimeoutMs,
154+
txnTimeoutMs = resolvedTxnTimeoutMs,
142155
state = Empty,
143156
topicPartitions = collection.mutable.Set.empty[TopicPartition],
144157
txnLastUpdateTimestamp = time.milliseconds(),
@@ -157,7 +170,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
157170
val txnMetadata = existingEpochAndMetadata.transactionMetadata
158171

159172
txnMetadata.inLock {
160-
prepareInitProducerIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata,
173+
prepareInitProducerIdTransit(transactionalId, resolvedTxnTimeoutMs, coordinatorEpoch, txnMetadata,
161174
expectedProducerIdAndEpoch)
162175
}
163176
}

core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,12 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
419419
*/
420420
def isProducerEpochExhausted: Boolean = TransactionMetadata.isEpochExhausted(producerEpoch)
421421

422+
/**
423+
* Check if this is a distributed two phase commit transaction.
424+
* Such transactions have no timeout (identified by maximum value for timeout).
425+
*/
426+
def isDistributedTwoPhaseCommitTxn: Boolean = txnTimeoutMs == Int.MaxValue
427+
422428
private def hasPendingTransaction: Boolean = {
423429
state match {
424430
case Ongoing | PrepareAbort | PrepareCommit => true

core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ class TransactionStateManager(brokerId: Int,
108108
version
109109
}
110110

111+
private[transaction] def isTransaction2pcEnabled(): Boolean = { config.transaction2PCEnable }
112+
111113
// visible for testing only
112114
private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = {
113115
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -130,7 +132,9 @@ class TransactionStateManager(brokerId: Int,
130132
} else {
131133
txnMetadata.state match {
132134
case Ongoing =>
133-
txnMetadata.txnStartTimestamp + txnMetadata.txnTimeoutMs < now
135+
// Do not apply timeout to distributed two phase commit transactions.
136+
(!txnMetadata.isDistributedTwoPhaseCommitTxn) &&
137+
(txnMetadata.txnStartTimestamp + txnMetadata.txnTimeoutMs < now)
134138
case _ => false
135139
}
136140
}
@@ -396,10 +400,18 @@ class TransactionStateManager(brokerId: Int,
396400
}
397401

398402
/**
399-
* Validate the given transaction timeout value
403+
* Validates the provided transaction timeout.
404+
* - If 2PC is enabled, the timeout is always valid (set to Int.MAX by default).
405+
* - Otherwise, the timeout must be a positive value and not exceed the
406+
* configured transaction max timeout.
407+
*
408+
* @param enableTwoPC Whether Two-Phase Commit (2PC) is enabled.
409+
* @param txnTimeoutMs The requested transaction timeout in milliseconds.
410+
* @return `true` if the timeout is valid, `false` otherwise.
400411
*/
401-
def validateTransactionTimeoutMs(txnTimeoutMs: Int): Boolean =
402-
txnTimeoutMs <= config.transactionMaxTimeoutMs && txnTimeoutMs > 0
412+
def validateTransactionTimeoutMs(enableTwoPC: Boolean, txnTimeoutMs: Int): Boolean = {
413+
enableTwoPC || (txnTimeoutMs <= config.transactionMaxTimeoutMs && txnTimeoutMs > 0)
414+
}
403415

404416
def transactionTopicConfigs: Properties = {
405417
val props = new Properties
@@ -826,6 +838,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I
826838
transactionLogMinInsyncReplicas: Int = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT,
827839
abortTimedOutTransactionsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT,
828840
removeExpiredTransactionalIdsIntervalMs: Int = TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT,
841+
transaction2PCEnable: Boolean = TransactionStateManagerConfig.TRANSACTIONS_2PC_ENABLED_DEFAULT,
829842
requestTimeoutMs: Int = ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT)
830843

831844
case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) {

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,8 +1554,19 @@ class KafkaApis(val requestChannel: RequestChannel,
15541554
}
15551555

15561556
producerIdAndEpoch match {
1557-
case Right(producerIdAndEpoch) => txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs,
1558-
producerIdAndEpoch, sendResponseCallback, requestLocal)
1557+
case Right(producerIdAndEpoch) =>
1558+
val enableTwoPC = initProducerIdRequest.enable2Pc()
1559+
val keepPreparedTxn = initProducerIdRequest.keepPreparedTxn()
1560+
1561+
txnCoordinator.handleInitProducerId(
1562+
transactionalId,
1563+
initProducerIdRequest.data.transactionTimeoutMs,
1564+
enableTwoPC,
1565+
keepPreparedTxn,
1566+
producerIdAndEpoch,
1567+
sendResponseCallback,
1568+
requestLocal
1569+
)
15591570
case Left(error) => requestHelper.sendErrorResponseMaybeThrottle(request, error.exception)
15601571
}
15611572
}

core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,10 +523,18 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
523523

524524
class InitProducerIdOperation(val producerIdAndEpoch: Option[ProducerIdAndEpoch] = None) extends TxnOperation[InitProducerIdResult] {
525525
override def run(txn: Transaction): Unit = {
526-
transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback,
527-
RequestLocal.withThreadConfinedCaching)
526+
transactionCoordinator.handleInitProducerId(
527+
txn.transactionalId,
528+
60000,
529+
enableTwoPCFlag = false,
530+
keepPreparedTxn = false,
531+
producerIdAndEpoch,
532+
resultCallback,
533+
RequestLocal.withThreadConfinedCaching
534+
)
528535
replicaManager.tryCompleteActions()
529536
}
537+
530538
override def awaitAndVerify(txn: Transaction): Unit = {
531539
val initPidResult = result.getOrElse(throw new IllegalStateException("InitProducerId has not completed"))
532540
assertEquals(Errors.NONE, initPidResult.error)

0 commit comments

Comments
 (0)