Skip to content

Commit 4b2a310

Browse files
KAFKA-19147: Start authorizer before group coordinator to ensure coordinator authorizes regex topics (apache#19488)
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813) added `Topic:Describe` authorization of topics matching regex patterns to the group coordinator since it was difficult to authorize these in the broker when processing consumer heartbeats using the new protocol. But group coordinator is started in `BrokerServer` before the authorizer is created. And hence group coordinator doesn't have an authorizer and never performs authorization. As a result, topics that are not authorized for `Describe` may be assigned to consumers. This potentially leaks information about topic existence, topic id and partition count to users who are not authorized to describe a topic. This PR starts authorizer earlier to ensure that authorization is performed by the group coordinator. Also adds integration tests for verification. Note that we still have a second issue when members have different permissions. If regex is resolved by a member with permission to more topics, unauthorized topics may be assigned to members with lower permissions. In this case, we still return assignment containing topic id and partitions to the member without `Topic:Describe` access. This is not addressed by this PR, but an integration test that illustrates the issue has been added so that we can verify when the issue is fixed. Reviewers: David Jacot <[email protected]>
1 parent 58ad29c commit 4b2a310

File tree

2 files changed

+120
-6
lines changed

2 files changed

+120
-6
lines changed

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ class BrokerServer(
358358
/* start token manager */
359359
tokenManager = new DelegationTokenManager(new DelegationTokenManagerConfigs(config), tokenCache)
360360

361+
// Create and initialize an authorizer if one is configured.
362+
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)
363+
361364
/* initializing the groupConfigManager */
362365
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
363366

@@ -412,9 +415,6 @@ class BrokerServer(
412415
logManager.readBrokerEpochFromCleanShutdownFiles()
413416
)
414417

415-
// Create and initialize an authorizer if one is configured.
416-
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)
417-
418418
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
419419
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
420420
val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
3737
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
3838
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
3939
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
40-
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
40+
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
41+
import org.apache.kafka.common.network.ListenerName
4142
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
4243
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
4344
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
@@ -2547,6 +2548,118 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
25472548
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
25482549
}
25492550

2551+
@ParameterizedTest
2552+
@ValueSource(strings = Array("kraft"))
2553+
def testConsumerGroupHeartbeaWithRegex(quorum: String): Unit = {
2554+
createTopicWithBrokerPrincipal(topic)
2555+
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
2556+
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
2557+
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
2558+
2559+
val response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
2560+
sendAndReceiveRegexHeartbeat(response, listenerName, Some(1))
2561+
}
2562+
2563+
@ParameterizedTest
2564+
@ValueSource(strings = Array("kraft"))
2565+
def testConsumerGroupHeartbeaWithRegexWithoutTopicDescribeAcl(quorum: String): Unit = {
2566+
createTopicWithBrokerPrincipal(topic)
2567+
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
2568+
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
2569+
2570+
val response = sendAndReceiveFirstRegexHeartbeat(Uuid.randomUuid.toString, listenerName)
2571+
sendAndReceiveRegexHeartbeat(response, listenerName, None)
2572+
}
2573+
2574+
@ParameterizedTest
2575+
@ValueSource(strings = Array("kraft"))
2576+
def testConsumerGroupHeartbeaWithRegexWithDifferentMemberAcls(quorum: String): Unit = {
2577+
createTopicWithBrokerPrincipal(topic, numPartitions = 2)
2578+
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
2579+
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
2580+
2581+
// Member on inter-broker listener has all access and is assigned the matching topic
2582+
var member1Response = sendAndReceiveFirstRegexHeartbeat("memberWithAllAccess", interBrokerListenerName)
2583+
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(2))
2584+
2585+
// Member on client listener has no topic describe access, but is assigned a partition of the
2586+
// unauthorized topic. This is leaking unauthorized topic metadata to member2. Simply filtering out
2587+
// the topic from the assignment in the response is not sufficient since different assignment states
2588+
// in the broker and client can lead to other issues. This needs to be fixed properly by using
2589+
// member permissions while computing assignments.
2590+
var member2Response = sendAndReceiveFirstRegexHeartbeat("memberWithLimitedAccess", listenerName)
2591+
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(1))
2592+
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, None, fullRequest = true)
2593+
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(1))
2594+
2595+
// Create another topic and send heartbeats on member1 to trigger regex refresh
2596+
createTopicWithBrokerPrincipal("topic2", numPartitions = 2)
2597+
TestUtils.retry(15000) {
2598+
member1Response = sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(2))
2599+
}
2600+
// This is leaking unauthorized topic metadata to member2.
2601+
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(2))
2602+
2603+
// Create another topic and send heartbeats on member2 to trigger regex refresh
2604+
createTopicWithBrokerPrincipal("topic3", numPartitions = 2)
2605+
TestUtils.retry(15000) {
2606+
member2Response = sendAndReceiveRegexHeartbeat(member2Response, listenerName, Some(0), fullRequest = true)
2607+
}
2608+
// This removes all topics from member1 since member2's permissions were used to refresh regex.
2609+
sendAndReceiveRegexHeartbeat(member1Response, interBrokerListenerName, Some(0), fullRequest = true)
2610+
}
2611+
2612+
private def sendAndReceiveFirstRegexHeartbeat(memberId: String,
2613+
listenerName: ListenerName): ConsumerGroupHeartbeatResponseData = {
2614+
val request = new ConsumerGroupHeartbeatRequest.Builder(
2615+
new ConsumerGroupHeartbeatRequestData()
2616+
.setGroupId(group)
2617+
.setMemberId(memberId)
2618+
.setMemberEpoch(0)
2619+
.setRebalanceTimeoutMs(5 * 60 * 1000)
2620+
.setTopicPartitions(Collections.emptyList())
2621+
.setSubscribedTopicRegex("^top.*")).build()
2622+
val resource = Set[ResourceType](GROUP, TOPIC)
2623+
val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true, listenerName = listenerName)
2624+
.data.asInstanceOf[ConsumerGroupHeartbeatResponseData]
2625+
assertEquals(Errors.NONE.code, response.errorCode, s"Unexpected response $response")
2626+
assertEquals(0, response.assignment.topicPartitions.size, s"Unexpected assignment $response")
2627+
response
2628+
}
2629+
2630+
private def sendAndReceiveRegexHeartbeat(lastResponse: ConsumerGroupHeartbeatResponseData,
2631+
listenerName: ListenerName,
2632+
expectedAssignmentSize: Option[Int],
2633+
fullRequest: Boolean = false): ConsumerGroupHeartbeatResponseData = {
2634+
var data = new ConsumerGroupHeartbeatRequestData()
2635+
.setGroupId(group)
2636+
.setMemberId(lastResponse.memberId)
2637+
.setMemberEpoch(lastResponse.memberEpoch)
2638+
if (fullRequest) {
2639+
val partitions = Option(lastResponse.assignment).map(_.topicPartitions.asScala.map(p =>
2640+
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
2641+
.setTopicId(p.topicId)
2642+
.setPartitions(p.partitions)
2643+
)).getOrElse(List())
2644+
data = data
2645+
.setTopicPartitions(partitions.asJava)
2646+
.setSubscribedTopicRegex("^top.*")
2647+
}
2648+
val request = new ConsumerGroupHeartbeatRequest.Builder(data).build()
2649+
val resource = Set[ResourceType](GROUP, TOPIC)
2650+
val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true, listenerName = listenerName)
2651+
.data.asInstanceOf[ConsumerGroupHeartbeatResponseData]
2652+
assertEquals(Errors.NONE.code, response.errorCode, s"Unexpected response $response")
2653+
expectedAssignmentSize match {
2654+
case Some(size) =>
2655+
assertNotNull(response.assignment, s"Unexpected assignment $response")
2656+
assertEquals(size, response.assignment.topicPartitions.asScala.map(_.partitions.size).sum, s"Unexpected assignment $response")
2657+
case None =>
2658+
assertNull(response.assignment, s"Unexpected assignment $response")
2659+
}
2660+
response
2661+
}
2662+
25502663
private def createConsumerGroupToDescribe(): Unit = {
25512664
createTopicWithBrokerPrincipal(topic)
25522665
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
@@ -2651,9 +2764,10 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
26512764
resources: Set[ResourceType],
26522765
isAuthorized: Boolean,
26532766
topicExists: Boolean = true,
2654-
topicNames: Map[Uuid, String] = getTopicNames()): AbstractResponse = {
2767+
topicNames: Map[Uuid, String] = getTopicNames(),
2768+
listenerName: ListenerName = listenerName): AbstractResponse = {
26552769
val apiKey = request.apiKey
2656-
val response = connectAndReceive[AbstractResponse](request)
2770+
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName)
26572771
val error = requestKeyToError(topicNames, request.version())(apiKey).asInstanceOf[AbstractResponse => Errors](response)
26582772

26592773
val authorizationErrors = resources.flatMap { resourceType =>

0 commit comments

Comments
 (0)