Skip to content

KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group #19796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 3, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 23, 2025

  • Use metadata hash to replace subscription metadata.
  • Remove ShareGroupPartitionMetadataKey and
    ShareGroupPartitionMetadataValue.
  • Use subscriptionTopicNames and metadataImage to replace
    subscriptionMetadata in subscribedTopicsChangeMap function.

Reviewers: Chia-Ping Tsai [email protected], David Jacot
[email protected], Andrew Schofield [email protected]

@github-actions github-actions bot added the tools label May 23, 2025
Signed-off-by: PoAn Yang <[email protected]>
@github-actions github-actions bot added the core Kafka Broker label May 24, 2025
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: Replace subscription metadata with metadata hash in share group (wip) KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group May 24, 2025
@dajac
Copy link
Member

dajac commented May 26, 2025

This one should basically be a copy of #19761. Let's merge that one first and then we can review this one.

@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label May 29, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I have only just started reviewing and will have more comments.

Because you are removing ShareGroupPartitionMetadata, you should make some changes to CoordinatorRecordType.java also I think. Essentially, ordinal 9 in the enum is no longer used (and could be used in the future).

@FrankYang0529
Copy link
Member Author

@AndrewJSchofield The CoordinatorRecordType.java is automatically generated. Do you mean that we can change ShareGroupMemberMetadata to 9, SHARE_GROUP_METADATA to 10, and so on?

@AndrewJSchofield
Copy link
Member

@AndrewJSchofield The CoordinatorRecordType.java is automatically generated. Do you mean that we can change ShareGroupMemberMetadata to 9, SHARE_GROUP_METADATA to 10, and so on?

Ah, yes, I see. It's fine. One day, someone might notice that 9 is unused and then assign a new record type to it. The way that you've got it is fine. I just had to clone your branch and build it :)

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch. two small comments are left. PTAL

// Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, 0));
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, groupEpoch, metadataHash));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we calculate the metadataHash if it is not defined? That is analogous to subscriptionMetadata

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. The subscriptionMetadata can be null, so we can determine whether a developer sets the value. However, the metadataHash is long and cannot be null. I would prefer to let the developer to set it manually.

if (topicImage != null) {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of();
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) {
Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().collect(Collectors.toSet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider using alreadyInitializedPartSet to filter out the partitions from partitionSet?

                    Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed()
                            .filter(p -> !alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet());

It could be a small optimization, I guess :)

@FrankYang0529 FrankYang0529 requested a review from chia7712 June 2, 2025 01:25
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Apart from one tiny nit, this looks like a faithful copy of the equivalent changes for consumer groups. I'll download the patch and try it on my local environment, but I expect we'll be good to merge for 4.1.

}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This log includes the metadata hash in consumer groups. Seems sensible here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

@FrankYang0529
Copy link
Member Author

@AndrewJSchofield, There is no assignment function, so we can not test it like consumer group in #19856. A workable way may need to use rebalance-total metric to check whether rebalance happen. Do you have any suggestion for this? I can add it in the next PR.

@dajac
Copy link
Member

dajac commented Jun 3, 2025

@AndrewJSchofield, There is no assignment function, so we can not test it like consumer group in #19856. A workable way may need to use rebalance-total metric to check whether rebalance happen. Do you have any suggestion for this? I can add it in the next PR.

@FrankYang0529 Could we use the share group describe method from the admin client?

@AndrewJSchofield
Copy link
Member

@FrankYang0529 I think you are planning a follow-on PR with additional testing when rebalance occurs. Apart from that future work, this looks ready to merge.

@AndrewJSchofield AndrewJSchofield merged commit 2977cb1 into apache:trunk Jun 3, 2025
24 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-17747-6 branch June 3, 2025 15:38
@FrankYang0529
Copy link
Member Author

FrankYang0529 commented Jun 4, 2025

@AndrewJSchofield I tried to add an integration test. However, the GroupMetadataManager doesn't support to use other assignors for share groups. Currently, there is no usage of withShareGroupAssignor in production code, so it can only use SimpleAssignor.

The consumer group uses group.consumer.assignors to define assignors. Do we want to add a new config group.share.assignors for share group? I can add this in the next PR. Thanks.

Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) {
this.shareGroupAssignor = shareGroupAssignor;
return this;
}

if (shareGroupAssignor == null)
shareGroupAssignor = new SimpleAssignor();

@AndrewJSchofield
Copy link
Member

@AndrewJSchofield I tried to add an integration test. However, the GroupMetadataManager doesn't support to use other assignors for share groups. Currently, there is no usage of withShareGroupAssignor in production code, so it can only use SimpleAssignor.

The consumer group uses group.consumer.assignors to define assignors. Do we want to add a new config group.share.assignors for share group? I can add this in the next PR. Thanks.

Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) {
this.shareGroupAssignor = shareGroupAssignor;
return this;
}

if (shareGroupAssignor == null)
shareGroupAssignor = new SimpleAssignor();

Ah, yes, good catch. We missed this config. I have created a specific issue https://issues.apache.org/jira/browse/KAFKA-19369. Given that KIP-932 does not permit a list of assignors, the only thing a user could do is implement their own assignor and replace the simple one. We've essentially hard-coded the SimpleAssignor with no way to override it. When we add more assignors in the future, the plan was to enhance the support for this config to support a list, just as for consumer groups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka tools
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants