Skip to content

Commit 6aef94e

Browse files
authored
KAFKA-18411 Remove ZkProducerIdManager (apache#18413)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent c40cc57 commit 6aef94e

File tree

3 files changed

+1
-196
lines changed

3 files changed

+1
-196
lines changed

core/src/main/scala/kafka/controller/KafkaController.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
2222
import kafka.common._
2323
import kafka.cluster.Broker
2424
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
25-
import kafka.coordinator.transaction.ZkProducerIdManager
2625
import kafka.server._
2726
import kafka.server.metadata.ZkFinalizedFeatureCache
2827
import kafka.utils._
@@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException.Code
5251
import scala.collection.{Map, Seq, Set, immutable, mutable}
5352
import scala.collection.mutable.ArrayBuffer
5453
import scala.jdk.CollectionConverters._
55-
import scala.util.{Failure, Success, Try}
54+
import scala.util.{Failure, Try}
5655

5756
sealed trait ElectionTrigger
5857
case object AutoTriggered extends ElectionTrigger
@@ -2545,17 +2544,6 @@ class KafkaController(val config: KafkaConfig,
25452544
callback.apply(Left(Errors.STALE_BROKER_EPOCH))
25462545
return
25472546
}
2548-
2549-
val maybeNewProducerIdsBlock = try {
2550-
Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this))
2551-
} catch {
2552-
case ke: KafkaException => Failure(ke)
2553-
}
2554-
2555-
maybeNewProducerIdsBlock match {
2556-
case Failure(exception) => callback.apply(Left(Errors.forException(exception)))
2557-
case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock))
2558-
}
25592547
}
25602548

25612549
private def processControllerChange(): Unit = {

core/src/main/scala/kafka/coordinator/transaction/ZkProducerIdManager.scala

Lines changed: 0 additions & 102 deletions
This file was deleted.

core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala

Lines changed: 0 additions & 81 deletions
This file was deleted.

0 commit comments

Comments
 (0)