From 78f3b0ceabc1ba1240f1aa75b5861d4c36228aa0 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Fri, 23 May 2025 17:10:51 +0800 Subject: [PATCH 1/6] KAFKA-17747: Replace subscription metadata with metadata hash in share group Signed-off-by: PoAn Yang --- .../group/GroupCoordinatorRecordHelpers.java | 47 ------- .../group/GroupCoordinatorShard.java | 9 -- .../group/GroupMetadataManager.java | 115 +++++++----------- .../apache/kafka/coordinator/group/Utils.java | 16 ++- .../coordinator/group/modern/ModernGroup.java | 40 ++++++ .../group/modern/TopicMetadata.java | 11 -- .../group/modern/share/ShareGroup.java | 1 - .../ShareGroupPartitionMetadataKey.json | 27 ---- .../ShareGroupPartitionMetadataValue.json | 41 ------- .../kafka/coordinator/group/Assertions.java | 25 ---- .../group/GroupMetadataManagerTest.java | 87 +++++++------ .../GroupMetadataManagerTestContext.java | 9 -- .../group/modern/share/ShareGroupBuilder.java | 29 +---- .../consumer/ShareGroupMessageFormatter.java | 1 - .../ShareGroupMessageFormatterTest.java | 31 ----- 15 files changed, 156 insertions(+), 333 deletions(-) delete mode 100644 group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json delete mode 100644 group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 288e8d3abe1df..db4d7736f3843 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -45,8 +45,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; @@ -600,51 +598,6 @@ public static CoordinatorRecord newShareGroupMemberSubscriptionTombstoneRecord( ); } - /** - * Creates a ShareGroupPartitionMetadata record. - * - * @param groupId The group id. - * @param newSubscriptionMetadata The subscription metadata. - * @return The record. - */ - public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord( - String groupId, - Map newSubscriptionMetadata - ) { - ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> - value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(topicMetadata.id()) - .setTopicName(topicMetadata.name()) - .setNumPartitions(topicMetadata.numPartitions()) - ) - ); - - return CoordinatorRecord.record( - new ShareGroupPartitionMetadataKey() - .setGroupId(groupId), - new ApiMessageAndVersion( - value, - (short) 0 - ) - ); - } - - /** - * Creates a ShareGroupPartitionMetadata tombstone. - * - * @param groupId The group id. - * @return The record. - */ - public static CoordinatorRecord newShareGroupSubscriptionMetadataTombstoneRecord( - String groupId - ) { - return CoordinatorRecord.tombstone( - new ShareGroupPartitionMetadataKey() - .setGroupId(groupId) - ); - } - /** * Creates a ShareGroupMetadata record. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 564612ebd20ce..5be24e1b319a8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -95,8 +95,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; @@ -1233,13 +1231,6 @@ public void replay( ); break; - case SHARE_GROUP_PARTITION_METADATA: - groupMetadataManager.replay( - (ShareGroupPartitionMetadataKey) key, - (ShareGroupPartitionMetadataValue) Utils.messageOrNull(value) - ); - break; - case SHARE_GROUP_MEMBER_METADATA: groupMetadataManager.replay( (ShareGroupMemberMetadataKey) key, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 7a1c98bc9e1a7..3f99b7603cfc4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -115,8 +115,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; @@ -201,6 +199,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; @@ -237,7 +236,6 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.Utils.assignmentToString; import static org.apache.kafka.coordinator.group.Utils.ofSentinel; @@ -490,6 +488,11 @@ GroupMetadataManager build() { */ private MetadataImage metadataImage; + /** + * The topic hash value by topic name. + */ + private final Map topicHashCache; + /** * This tracks the version (or the offset) of the last metadata image * with newly created topics. @@ -550,6 +553,7 @@ private GroupMetadataManager( this.shareGroupAssignor = shareGroupAssignor; this.authorizerPlugin = authorizerPlugin; this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity())); + this.topicHashCache = new ConcurrentHashMap<>(); } /** @@ -2536,18 +2540,18 @@ private CoordinatorResult subscriptionMetadata = group.subscriptionMetadata(); + Map subscribedTopicNamesMap = group.subscribedTopicNames(); SubscriptionType subscriptionType = group.subscriptionType(); if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { // The subscription metadata is updated in two cases: // 1) The member has updated its subscriptions; // 2) The refresh deadline has been reached. - Map subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember); - subscriptionMetadata = group.computeSubscriptionMetadata( + subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember); + long groupMetadataHash = group.computeMetadataHash( subscribedTopicNamesMap, - metadataImage.topics(), - metadataImage.cluster() + topicHashCache, + metadataImage ); int numMembers = group.numMembers(); @@ -2560,16 +2564,15 @@ private CoordinatorResult subscribedTopicsChangeMap(String groupId, Map subscriptionMetadata) { - if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { + Map subscribedTopicsChangeMap(String groupId, Set subscriptionTopicNames) { + if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty()) { return Map.of(); } @@ -2684,18 +2687,20 @@ Map subscribedTopicsChangeMap(String groupId, Map curTimestamp - entry.getValue().timestamp() < delta) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) ); - // Here will add any topics which are subscribed but not initialized and initializing // topics whose timestamp indicates that they are older than delta elapsed. - subscriptionMetadata.forEach((topicName, topicMetadata) -> { - Set alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of(); - if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) { - Set partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); - partitionSet.removeAll(alreadyInitializedPartSet); - // alreadyInitialized contains all initialized topics and initializing topics which are less than delta old - // which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we - // are also updating the timestamp here which means, old initializing will not be included repeatedly. - topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp)); + subscriptionTopicNames.forEach(topicName -> { + TopicImage topicImage = metadataImage.topics().getTopic(topicName); + if (topicImage != null) { + Set alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of(); + if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) { + Set partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().collect(Collectors.toSet()); + partitionSet.removeAll(alreadyInitializedPartSet); + // alreadyInitialized contains all initialized topics and initializing topics which are less than delta old + // which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we + // are also updating the timestamp here which means, old initializing will not be included repeatedly. + topicPartitionChangeMap.computeIfAbsent(topicImage.id(), k -> new InitMapValue(topicImage.name(), partitionSet, curTimestamp)); + } } }); return topicPartitionChangeMap; @@ -2705,22 +2710,22 @@ Map subscribedTopicsChangeMap(String groupId, Map maybeCreateInitializeShareGroupStateRequest( String groupId, int groupEpoch, - Map subscriptionMetadata, + Set subscriptionTopicNames, List records ) { - if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) { + if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty() || metadataImage.isEmpty()) { return Optional.empty(); } - Map topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); + Map topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionTopicNames); // Nothing to initialize. if (topicPartitionChangeMap.isEmpty()) { @@ -4061,21 +4066,20 @@ private CoordinatorResult shareGroupFenceMember( records.add(newShareGroupMemberSubscriptionTombstoneRecord(group.groupId(), member.memberId())); // We update the subscription metadata without the leaving member. - Map subscriptionMetadata = group.computeSubscriptionMetadata( + long groupMetadataHash = group.computeMetadataHash( group.computeSubscribedTopicNames(member, null), - metadataImage.topics(), - metadataImage.cluster() + topicHashCache, + metadataImage ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { - log.info("[GroupId {}] Computed new subscription metadata: {}.", - group.groupId(), subscriptionMetadata); - records.add(newShareGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); + if (groupMetadataHash != group.metadataHash()) { + log.info("[GroupId {}] Computed new metadata hash: {}.", + group.groupId(), groupMetadataHash); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; - records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0)); + records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash)); cancelGroupSessionTimeout(group.groupId(), member.memberId()); @@ -5372,6 +5376,7 @@ public void replay( if (value != null) { ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true); shareGroup.setGroupEpoch(value.epoch()); + shareGroup.setMetadataHash(value.metadataHash()); } else { ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false); if (!shareGroup.members().isEmpty()) { @@ -5552,32 +5557,6 @@ public void replay( } } - /** - * Replays ShareGroupPartitionMetadataKey/Value to update the hard state of - * the share group. It updates the subscription metadata of the share - * group. - * - * @param key A ShareGroupPartitionMetadataKey key. - * @param value A ShareGroupPartitionMetadataValue record. - */ - public void replay( - ShareGroupPartitionMetadataKey key, - ShareGroupPartitionMetadataValue value - ) { - String groupId = key.groupId(); - ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false); - - if (value != null) { - Map subscriptionMetadata = new HashMap<>(); - value.topics().forEach(topicMetadata -> - subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata)) - ); - group.setSubscriptionMetadata(subscriptionMetadata); - } else { - group.setSubscriptionMetadata(Map.of()); - } - } - /** * Replays ShareGroupTargetAssignmentMemberKey/Value to update the hard state of * the share group. It updates the target assignment of the member or deletes it. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index f7441fce0b1c6..0d3d54668782c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -341,6 +341,7 @@ static void throwIfRegularExpressionIsInvalid( * The computed hash value is stored as the metadata hash in the *GroupMetadataValue. *

* If there is no topic, the hash value is set to 0. + * If a topic hash is 0, ignore the topic. * The hashing process involves the following steps: * 1. Sort the topic hashes by topic name. * 2. Write each topic hash in order. @@ -348,13 +349,20 @@ static void throwIfRegularExpressionIsInvalid( * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. * @return The hash of the group. */ - static long computeGroupHash(Map topicHashes) { - if (topicHashes.isEmpty()) { + public static long computeGroupHash(Map topicHashes) { + List> sortedEntries = new ArrayList<>(); + for (Map.Entry entry : topicHashes.entrySet()) { + // Filter out entries with a hash value of 0, which indicates no topic + if (entry.getValue() != 0) { + sortedEntries.add(entry); + } + } + + if (sortedEntries.isEmpty()) { return 0; } // Sort entries by topic name - List> sortedEntries = new ArrayList<>(topicHashes.entrySet()); sortedEntries.sort(Map.Entry.comparingByKey()); HashStream64 hasher = Hashing.xxh3_64().hashStream(); @@ -386,7 +394,7 @@ static long computeGroupHash(Map topicHashes) { * @param metadataImage The cluster image. * @return The hash of the topic. */ - static long computeTopicHash(String topicName, MetadataImage metadataImage) { + public static long computeTopicHash(String topicName, MetadataImage metadataImage) { TopicImage topicImage = metadataImage.topics().getTopic(topicName); if (topicImage == null) { return 0; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 0c6ff25cb1356..50ce7e8a1b5ac 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -20,13 +20,16 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.Utils; import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineLong; import org.apache.kafka.timeline.TimelineObject; import java.util.Collections; @@ -88,6 +91,11 @@ public static class DeadlineAndEpoch { */ protected final TimelineHashMap subscribedTopicMetadata; + /** + * The metadata hash which is computed based on the all subscribed topics. + */ + protected final TimelineLong metadataHash; + /** * The group's subscription type. * This value is set to Homogeneous by default. @@ -134,6 +142,7 @@ protected ModernGroup( this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.metadataHash = new TimelineLong(snapshotRegistry); this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS); this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); @@ -355,6 +364,13 @@ public Map subscriptionMetadata() { return Collections.unmodifiableMap(subscribedTopicMetadata); } + /** + * @return The metadata hash. + */ + public long metadataHash() { + return metadataHash.get(); + } + /** * Updates the subscription metadata. This replaces the previous one. * @@ -367,6 +383,15 @@ public void setSubscriptionMetadata( this.subscribedTopicMetadata.putAll(subscriptionMetadata); } + /** + * Updates the metadata hash. + * + * @param metadataHash The new metadata hash. + */ + public void setMetadataHash(long metadataHash) { + this.metadataHash.set(metadataHash); + } + /** * Computes the subscription metadata based on the current subscription info. * @@ -398,6 +423,21 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } + public long computeMetadataHash( + Map subscribedTopicNames, + Map topicHashCache, + MetadataImage metadataImage + ) { + Map topicHash = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.keySet().forEach(topicName -> { + topicHash.put( + topicName, + topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage)) + ); + }); + return Utils.computeGroupHash(topicHash); + } + /** * Updates the metadata refresh deadline. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java index ace670cc0e46c..b5f2f08a063c8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import java.util.Objects; @@ -119,14 +118,4 @@ public static TopicMetadata fromRecord( record.numPartitions() ); } - - public static TopicMetadata fromRecord( - ShareGroupPartitionMetadataValue.TopicMetadata record - ) { - return new TopicMetadata( - record.topicId(), - record.topicName(), - record.numPartitions() - ); - } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index 2d5f5fe3956b8..e0fc1190a2eab 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -274,7 +274,6 @@ public void createGroupTombstoneRecords(List records) { records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId(), memberId)) ); - records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId())); records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId())); records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId())); } diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json deleted file mode 100644 index 146b9fdcb9103..0000000000000 --- a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. -{ - "apiKey": 9, - "type": "coordinator-key", - "name": "ShareGroupPartitionMetadataKey", - "validVersions": "0", - "flexibleVersions": "none", - "fields": [ - { "name": "GroupId", "type": "string", "versions": "0", - "about": "The group id." } - ] -} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json deleted file mode 100644 index af63f047126e5..0000000000000 --- a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. -{ - "apiKey": 9, - "type": "coordinator-value", - "name": "ShareGroupPartitionMetadataValue", - "validVersions": "0", - "flexibleVersions": "0+", - "fields": [ - { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata", - "about": "The list of topic metadata.", "fields": [ - { "name": "TopicId", "versions": "0+", "type": "uuid", - "about": "The topic id." }, - { "name": "TopicName", "versions": "0+", "type": "string", - "about": "The topic name." }, - { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." }, - { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", - "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored.", "fields": [ - { "name": "Partition", "versions": "0+", "type": "int32", - "about": "The partition number." }, - { "name": "Racks", "versions": "0+", "type": "[]string", - "about": "The set of racks that the partition is mapped to." } - ]} - ]} - ] -} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java index a593bb3d6646b..0d68375d0c55f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java @@ -30,7 +30,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -61,7 +60,6 @@ public class Assertions { ConsumerGroupPartitionMetadataValue.class, Assertions::assertConsumerGroupPartitionMetadataValue, GroupMetadataValue.class, Assertions::assertGroupMetadataValue, ConsumerGroupTargetAssignmentMemberValue.class, Assertions::assertConsumerGroupTargetAssignmentMemberValue, - ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue, ShareGroupStatePartitionMetadataValue.class, Assertions::assertShareGroupStatePartitionMetadataValue ); @@ -264,29 +262,6 @@ private static void assertConsumerGroupPartitionMetadataValue( assertEquals(expected, actual); } - private static void assertShareGroupPartitionMetadataValue( - ApiMessage exp, - ApiMessage act - ) { - // The order of the racks stored in the PartitionMetadata of the ShareGroupPartitionMetadataValue - // is not always guaranteed. Therefore, we need a special comparator. - ShareGroupPartitionMetadataValue expected = (ShareGroupPartitionMetadataValue) exp.duplicate(); - ShareGroupPartitionMetadataValue actual = (ShareGroupPartitionMetadataValue) act.duplicate(); - - Consumer normalize = message -> { - message.topics().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.TopicMetadata::topicId)); - message.topics().forEach(topic -> { - topic.partitionMetadata().sort(Comparator.comparing(ShareGroupPartitionMetadataValue.PartitionMetadata::partition)); - topic.partitionMetadata().forEach(partition -> partition.racks().sort(String::compareTo)); - }); - }; - - normalize.accept(expected); - normalize.accept(actual); - - assertEquals(expected, actual); - } - private static void assertShareGroupStatePartitionMetadataValue( ApiMessage exp, ApiMessage act diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 9c644bb7e13a6..22b1ea5cd9fb2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -190,6 +190,8 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ADDRESS; import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ID; import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_PROCESS_ID; +import static org.apache.kafka.coordinator.group.Utils.computeGroupHash; +import static org.apache.kafka.coordinator.group.Utils.computeTopicHash; import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; @@ -3247,7 +3249,6 @@ public void testSessionTimeoutExpirationForShareMember() { GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId), - GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0) ) ) @@ -3266,10 +3267,12 @@ public void testOnLoadedSessionTimeoutExpirationForShareMember() { String fooTopicName = "foo"; String memberId = "foo-1"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withShareGroup(new ShareGroupBuilder(groupId, 10) .withMember(new ShareGroupMember.Builder(memberId) .setState(MemberState.STABLE) @@ -3283,7 +3286,10 @@ public void testOnLoadedSessionTimeoutExpirationForShareMember() { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )))) .build(); // Let's assume that all the records have been replayed and now @@ -3306,7 +3312,6 @@ public void testOnLoadedSessionTimeoutExpirationForShareMember() { GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId), - GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0) ) ) @@ -15073,11 +15078,10 @@ public void testShareGroupMemberJoinsEmptyGroupWithAssignments() { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, expectedMember), - GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, image), + barTopicName, computeTopicHash(barTopicName, image) + ))), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) @@ -15125,15 +15129,17 @@ public void testShareGroupLeavingMemberBumpsGroupEpoch() { Uuid zarTopicId = Uuid.randomUuid(); String zarTopicName = "zar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withShareGroupAssignor(assignor) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withShareGroup(new ShareGroupBuilder(groupId, 10) .withMember(new ShareGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -15164,7 +15170,12 @@ public void testShareGroupLeavingMemberBumpsGroupEpoch() { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage), + zarTopicName, computeTopicHash(zarTopicName, metadataImage) + )))) .build(); // Member 2 leaves the consumer group. @@ -15195,11 +15206,10 @@ public void testShareGroupLeavingMemberBumpsGroupEpoch() { GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), // Subscription metadata is recomputed because zar is no longer there. - GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))) ); assertRecordsEquals(expectedRecords, result.records()); @@ -15282,7 +15292,6 @@ public void testShareGroupDeleteTombstones() { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId), - GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId), GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId) ); @@ -21714,7 +21723,7 @@ public void testShareGroupInitializeSuccess() { CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), snapshotMetadataInitializeRecordMap, Map.of()); assertNull(result.response()); - assertEquals(List.of(record), result.records()); + assertRecordsEquals(List.of(record), result.records()); // Make sure the timeline map is not modified yet. assertEquals(snapshotMetadataInitializeRecordMap, context.groupMetadataManager.shareGroupStatePartitionMetadata().get(groupId).initializingTopics()); } @@ -21763,23 +21772,25 @@ public void testSubscribedTopicsChangeMap() { .withShareGroupAssignor(assignor) .withTime(time) .withConfig(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, offsetWriteTimeout) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(topicId, topicName, partitions) + .build()) .build(); - // Empty on empty subscription metadata + // Empty on empty subscription topics assertEquals( Map.of(), - context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of()) + context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Set.of()) ); - // No error on empty initialized metadata (no replay of initialized topics) long timeNow = time.milliseconds() + 100; time.setCurrentTimeMs(timeNow); assertEquals( Map.of( topicId, new InitMapValue(topicName, Set.of(0), timeNow) ), - context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( - topicName, new TopicMetadata(topicId, topicName, partitions) + context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Set.of( + topicName )) ); @@ -21817,6 +21828,14 @@ topicName, new TopicMetadata(topicId, topicName, partitions) .setDeletingTopics(List.of()) ); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(t1Id, t1Name, 2) + .addTopic(t2Id, t2Name, 2) + .addTopic(t3Id, t3Name, 3) + .build(); + + context.groupMetadataManager.onNewMetadataImage(metadataImage, new MetadataDelta(metadataImage)); + // Since t1 is initializing and t2 is initialized due to replay above. timeNow = timeNow + 2 * offsetWriteTimeout + 1; time.setCurrentTimeMs(timeNow); @@ -21825,10 +21844,10 @@ topicName, new TopicMetadata(topicId, topicName, partitions) t1Id, new InitMapValue(t1Name, Set.of(0, 1), timeNow), // initializing t3Id, new InitMapValue(t3Name, Set.of(0, 1, 2), timeNow) // initialized ), - context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( - t1Name, new TopicMetadata(t1Id, t1Name, 2), - t2Name, new TopicMetadata(t2Id, t2Name, 2), - t3Name, new TopicMetadata(t3Id, t3Name, 3) + context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Set.of( + t1Name, + t2Name, + t3Name )) ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index dc0670656d0a9..8599ec4a7cc3e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -83,8 +83,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; @@ -1681,13 +1679,6 @@ public void replay( ); break; - case SHARE_GROUP_PARTITION_METADATA: - groupMetadataManager.replay( - (ShareGroupPartitionMetadataKey) key, - (ShareGroupPartitionMetadataValue) messageOrNull(value) - ); - break; - case SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER: groupMetadataManager.replay( (ShareGroupTargetAssignmentMemberKey) key, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java index 8298358231022..458326c9a2e56 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java @@ -21,7 +21,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.TopicMetadata; -import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; @@ -37,6 +36,7 @@ public class ShareGroupBuilder { private final Map members = new HashMap<>(); private final Map assignments = new HashMap<>(); private Map subscriptionMetadata; + private long metadataHash = 0L; public ShareGroupBuilder(String groupId, int groupEpoch) { this.groupId = groupId; @@ -49,8 +49,8 @@ public ShareGroupBuilder withMember(ShareGroupMember member) { return this; } - public ShareGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) { - this.subscriptionMetadata = subscriptionMetadata; + public ShareGroupBuilder withMetadataHash(long metadataHash) { + this.metadataHash = metadataHash; return this; } @@ -72,29 +72,8 @@ public List build(TopicsImage topicsImage) { records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, member)) ); - // Add subscription metadata. - if (subscriptionMetadata == null) { - subscriptionMetadata = new HashMap<>(); - members.forEach((memberId, member) -> - member.subscribedTopicNames().forEach(topicName -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - subscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } - }) - ); - } - - if (!subscriptionMetadata.isEmpty()) { - records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); - } - // Add group epoch record. - records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, 0)); + records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, metadataHash)); // Add target assignment records. assignments.forEach((memberId, assignment) -> diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatter.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatter.java index 5477ffa1c0e58..7ad3dbb75a760 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatter.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatter.java @@ -31,7 +31,6 @@ */ public class ShareGroupMessageFormatter extends CoordinatorRecordMessageFormatter { private static final Set ALLOWED_RECORDS = Set.of( - CoordinatorRecordType.SHARE_GROUP_PARTITION_METADATA.id(), CoordinatorRecordType.SHARE_GROUP_MEMBER_METADATA.id(), CoordinatorRecordType.SHARE_GROUP_METADATA.id(), CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_METADATA.id(), diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatterTest.java index 96e8681c00d61..340fdd37bb56e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/ShareGroupMessageFormatterTest.java @@ -24,8 +24,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; @@ -39,16 +37,6 @@ import java.util.stream.Stream; public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageFormatterTest { - - private static final ShareGroupPartitionMetadataKey SHARE_GROUP_PARTITION_METADATA_KEY = new ShareGroupPartitionMetadataKey() - .setGroupId("group-id"); - private static final ShareGroupPartitionMetadataValue SHARE_GROUP_PARTITION_METADATA_VALUE = new ShareGroupPartitionMetadataValue() - .setTopics(List.of(new ShareGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(Uuid.ONE_UUID) - .setTopicName("topic") - .setNumPartitions(1) - .setPartitionMetadata(List.of())) - ); private static final ShareGroupMemberMetadataKey SHARE_GROUP_MEMBER_METADATA_KEY = new ShareGroupMemberMetadataKey() .setGroupId("group-id") .setMemberId("member-id"); @@ -111,25 +99,6 @@ protected CoordinatorRecordMessageFormatter formatter() { @Override protected Stream parameters() { return Stream.of( - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 9, SHARE_GROUP_PARTITION_METADATA_KEY).array(), - MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_PARTITION_METADATA_VALUE).array(), - """ - {"key":{"type":9,"data":{"groupId":"group-id"}}, - "value":{"version":0, - "data":{"topics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ", - "topicName":"topic", - "numPartitions":1, - "partitionMetadata":[]}]}}} - """ - ), - Arguments.of( - MessageUtil.toVersionPrefixedByteBuffer((short) 9, SHARE_GROUP_PARTITION_METADATA_KEY).array(), - null, - """ - {"key":{"type":9,"data":{"groupId":"group-id"}},"value":null} - """ - ), Arguments.of( MessageUtil.toVersionPrefixedByteBuffer((short) 10, SHARE_GROUP_MEMBER_METADATA_KEY).array(), MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_MEMBER_METADATA_VALUE).array(), From 4fd9bc408bf7ae2d4cef06b7a56e62a9aeb289bb Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sat, 24 May 2025 12:38:25 +0800 Subject: [PATCH 2/6] fix failed test case Signed-off-by: PoAn Yang --- .../ShareGroupHeartbeatRequestTest.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index 865870eef3beb..0420f70665844 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -122,7 +122,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment) // Leave the group. @@ -241,7 +241,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. - assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) // Prepare the next heartbeat for member 2. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( @@ -260,7 +260,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. - assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]() topicPartitionsAssignedToMember1.forEach(topicPartition => { @@ -290,7 +290,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) } finally { admin.close() } @@ -365,7 +365,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) // Member leaves the group. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( @@ -396,7 +396,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) // Verify the response for member 1. - assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId) // Partition assignment remains intact on rejoining. assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment) @@ -481,7 +481,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) // Create the topic baz. val bazTopicId = TestUtils.createTopicWithAdminRaw( admin = admin, @@ -505,7 +505,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(3) + .setMemberEpoch(2) ).build() TestUtils.waitUntilTrue(() => { @@ -516,7 +516,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) // Increasing the partitions of topic bar which is already being consumed in the share group. increasePartitions(admin, "bar", 6) @@ -536,7 +536,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(5) + .setMemberEpoch(3) ).build() TestUtils.waitUntilTrue(() => { @@ -547,7 +547,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) // Delete the topic foo. TestUtils.deleteTopicWithAdmin( admin = admin, @@ -569,7 +569,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(7) + .setMemberEpoch(4) ).build() TestUtils.waitUntilTrue(() => { @@ -580,7 +580,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) } finally { admin.close() } @@ -900,7 +900,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment == expectedAssignment }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) // Restart the only running broker. val broker = cluster.brokers().values().iterator().next() From c5860e3c82b138abd6dff737e9d22512265a79ab Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 29 May 2025 08:42:52 +0900 Subject: [PATCH 3/6] fix failed test case Signed-off-by: PoAn Yang --- .../ShareGroupHeartbeatRequestTest.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index 0420f70665844..865870eef3beb 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -122,7 +122,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment) // Leave the group. @@ -241,7 +241,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) // Prepare the next heartbeat for member 2. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( @@ -260,7 +260,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions() // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]() topicPartitionsAssignedToMember1.forEach(topicPartition => { @@ -290,7 +290,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) } finally { admin.close() } @@ -365,7 +365,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) // Member leaves the group. shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( @@ -396,7 +396,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) // Verify the response for member 1. - assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId) // Partition assignment remains intact on rejoining. assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment) @@ -481,7 +481,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) // Create the topic baz. val bazTopicId = TestUtils.createTopicWithAdminRaw( admin = admin, @@ -505,7 +505,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(2) + .setMemberEpoch(3) ).build() TestUtils.waitUntilTrue(() => { @@ -516,7 +516,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) // Increasing the partitions of topic bar which is already being consumed in the share group. increasePartitions(admin, "bar", 6) @@ -536,7 +536,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(3) + .setMemberEpoch(5) ).build() TestUtils.waitUntilTrue(() => { @@ -547,7 +547,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch) // Delete the topic foo. TestUtils.deleteTopicWithAdmin( admin = admin, @@ -569,7 +569,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ShareGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(memberId) - .setMemberEpoch(4) + .setMemberEpoch(7) ).build() TestUtils.waitUntilTrue(() => { @@ -580,7 +580,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions) }, msg = s"Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch) } finally { admin.close() } @@ -900,7 +900,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { shareGroupHeartbeatResponse.data.assignment == expectedAssignment }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") // Verify the response. - assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch) + assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch) // Restart the only running broker. val broker = cluster.brokers().values().iterator().next() From 7e7754a6ead0fef6c441f9ae87c29799abc74de8 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Fri, 30 May 2025 11:00:40 +0900 Subject: [PATCH 4/6] remove error comment Signed-off-by: PoAn Yang --- .../src/main/java/org/apache/kafka/coordinator/group/Utils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index d5cd40b38bc4f..d614123d2a72b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -341,7 +341,6 @@ static void throwIfRegularExpressionIsInvalid( * The computed hash value is stored as the metadata hash in the *GroupMetadataValue. *

* If there is no topic, the hash value is set to 0. - * If a topic hash is 0, ignore the topic. * The hashing process involves the following steps: * 1. Sort the topic hashes by topic name. * 2. Write each topic hash in order. From d78960b3b2340db3d40b8c21ebecdcb3e5384a01 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sat, 31 May 2025 22:53:10 +0900 Subject: [PATCH 5/6] address comment Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/GroupMetadataManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 8cbc9a3ec4428..ed1eb3f721374 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2695,8 +2695,8 @@ Map subscribedTopicsChangeMap(String groupId, Set su if (topicImage != null) { Set alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of(); if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) { - Set partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().collect(Collectors.toSet()); - partitionSet.removeAll(alreadyInitializedPartSet); + Set partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed() + .filter(p -> !alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet()); // alreadyInitialized contains all initialized topics and initializing topics which are less than delta old // which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we // are also updating the timestamp here which means, old initializing will not be included repeatedly. From fa6ed76525cd8f2d2cc527e7ad58efd0ae6fbfd1 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 3 Jun 2025 13:45:01 +0800 Subject: [PATCH 6/6] address comment Signed-off-by: PoAn Yang --- .../apache/kafka/coordinator/group/GroupMetadataManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6fff344407fdd..7d286ffb7413b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2574,7 +2574,7 @@ private CoordinatorResult