Skip to content

Commit a6dfde7

Browse files
authored
KAFKA-18629: Utilize share group partition metadata for delete group. (apache#19363)
* Currently, the delete share group code flow uses `group.subscribedTopicNames()` to fetch information about all the share partitions to which a share group is subscribed to. * However, this is incorrect since once the group is EMPTY, a precondition for delete, the aforementioned method will return an empty list. * In this PR we have leveraged the `ShareGroupStatePartitionMetadata` record to grab the `initialized` and `initializing` partitions to build the delete candidates, which remedies the situation. Reviewers: Andrew Schofield <[email protected]>
1 parent c11938c commit a6dfde7

File tree

10 files changed

+232
-117
lines changed

10 files changed

+232
-117
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

+15
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,21 @@ public static CoordinatorRecord newShareGroupCurrentAssignmentTombstoneRecord(
814814
);
815815
}
816816

817+
/**
818+
* Creates a ShareGroupStatePartitionMetadata tombstone.
819+
*
820+
* @param groupId The share group id.
821+
* @return The record.
822+
*/
823+
public static CoordinatorRecord newShareGroupStatePartitionMetadataTombstoneRecord(
824+
String groupId
825+
) {
826+
return CoordinatorRecord.tombstone(
827+
new ShareGroupStatePartitionMetadataKey()
828+
.setGroupId(groupId)
829+
);
830+
}
831+
817832
/**
818833
* Creates a ShareGroupStatePartitionMetadata record.
819834
*

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -1073,24 +1073,25 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
10731073

10741074
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
10751075
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap -> {
1076-
DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
1077-
List<String> retainedGroupIds = deleteCandidateGroupIds(groupErrMap, groupList, collection);
1076+
DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults = new DeleteGroupsResponseData.DeletableGroupResultCollection();
1077+
List<String> retainedGroupIds = updateResponseAndGetNonErrorGroupList(groupErrMap, groupList, deletableGroupResults);
10781078
if (retainedGroupIds.isEmpty()) {
1079-
return CompletableFuture.completedFuture(collection);
1079+
return CompletableFuture.completedFuture(deletableGroupResults);
10801080
}
10811081

10821082
return handleDeleteGroups(context, topicPartition, retainedGroupIds)
1083-
.whenComplete((resp, __) -> resp.forEach(result -> collection.add(result.duplicate())))
1084-
.thenApply(__ -> collection);
1083+
.whenComplete((resp, __) -> resp.forEach(result -> deletableGroupResults.add(result.duplicate())))
1084+
.thenApply(__ -> deletableGroupResults);
10851085
});
10861086
// deleteShareGroups has its own exceptionally block, so we don't need one here.
10871087

10881088
// This future object has the following stages:
10891089
// - First it invokes the share group delete flow where the shard sharePartitionDeleteRequests
10901090
// method is invoked, and it returns request objects for each valid share group passed to it.
1091+
// All initialized and initializing share partitions are moved to deleting.
10911092
// - Then the requests are passed to the persister.deleteState method one at a time. The results
10921093
// are collated as a Map of groupId -> persister errors
1093-
// - The above map is then used to decide whether to invoke the group coordinator delete groups logic
1094+
// - The above map can be used to decide whether to invoke the group coordinator delete groups logic
10941095
// - Share groups with failed persister delete are NOT CONSIDERED for group coordinator delete.
10951096
// TLDR: DeleteShareGroups -> filter erroneous persister deletes -> general delete groups logic
10961097
futures.add(future);
@@ -1102,17 +1103,26 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
11021103
(accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate())));
11031104
}
11041105

1105-
private List<String> deleteCandidateGroupIds(
1106-
Map<String, Errors> groupErrMap,
1106+
/**
1107+
* Processes input shareGroupErrMap by retaining only those which do not contain an error.
1108+
* Also updates the result collection input arg with share groups containing errors.
1109+
*
1110+
* @param shareGroupErrMap Map keyed on share groupId and value as the error (NONE for no error).
1111+
* @param groupList Entire list of groups (all types)
1112+
* @param deletableGroupResults Collection of responses for delete groups request.
1113+
* @return A list of all non-error groupIds
1114+
*/
1115+
private List<String> updateResponseAndGetNonErrorGroupList(
1116+
Map<String, Errors> shareGroupErrMap,
11071117
List<String> groupList,
1108-
DeleteGroupsResponseData.DeletableGroupResultCollection collection
1118+
DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults
11091119
) {
11101120
List<String> errGroupIds = new ArrayList<>();
1111-
groupErrMap.forEach((groupId, error) -> {
1121+
shareGroupErrMap.forEach((groupId, error) -> {
11121122
if (error.code() != Errors.NONE.code()) {
11131123
log.error("Error deleting share group {} due to error {}", groupId, error);
11141124
errGroupIds.add(groupId);
1115-
collection.add(
1125+
deletableGroupResults.add(
11161126
new DeleteGroupsResponseData.DeletableGroupResult()
11171127
.setGroupId(groupId)
11181128
.setErrorCode(error.code())
@@ -1153,7 +1163,7 @@ private CompletableFuture<Map<String, Errors>> deleteShareGroups(
11531163
TopicPartition topicPartition,
11541164
List<String> groupList
11551165
) {
1156-
// topicPartition refers to internal topic __consumer_offsets
1166+
// topicPartition refers to internal topic __consumer_offsets.
11571167
return runtime.scheduleWriteOperation(
11581168
"delete-share-groups",
11591169
topicPartition,

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -589,18 +589,19 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
589589
* Method returns a Map keyed on groupId and value as pair of {@link DeleteShareGroupStateParameters}
590590
* and any ERRORS while building the request corresponding
591591
* to the valid share groups passed as the input.
592-
* <p></p>
592+
* <p>
593593
* The groupIds are first filtered by type to restrict the list to share groups.
594594
* @param groupIds - A list of groupIds as string
595-
* @return {@link CoordinatorResult} object always containing empty records and Map keyed on groupId and value pair (req, error)
595+
* @return A result object containing a map keyed on groupId and value pair (req, error) and related coordinator records.
596596
*/
597597
public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> sharePartitionDeleteRequests(List<String> groupIds) {
598598
Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> responseMap = new HashMap<>();
599+
List<CoordinatorRecord> records = new ArrayList<>();
599600
for (String groupId : groupIds) {
600601
try {
601602
ShareGroup group = groupMetadataManager.shareGroup(groupId);
602603
group.validateDeleteGroup();
603-
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group)
604+
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records)
604605
.ifPresent(req -> responseMap.put(groupId, Map.entry(req, Errors.NONE)));
605606
} catch (GroupIdNotFoundException exception) {
606607
log.debug("GroupId {} not found as a share group.", groupId);
@@ -609,7 +610,7 @@ public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters,
609610
responseMap.put(groupId, Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, Errors.forException(exception)));
610611
}
611612
}
612-
return new CoordinatorResult<>(List.of(), responseMap);
613+
return new CoordinatorResult<>(records, responseMap);
613614
}
614615

615616
/**

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+46-29
Original file line numberDiff line numberDiff line change
@@ -2984,7 +2984,7 @@ private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord
29842984
groupId,
29852985
attachTopicName(finalInitializingMap),
29862986
attachTopicName(currentMap.initializedTopics()),
2987-
Map.of()
2987+
attachTopicName(currentMap.deletingTopics())
29882988
)
29892989
);
29902990
}
@@ -4979,7 +4979,7 @@ public CoordinatorResult<Void, CoordinatorRecord> initializeShareGroupState(
49794979
group.groupId(),
49804980
attachTopicName(finalInitializingMap),
49814981
attachTopicName(finalInitializedMap),
4982-
Map.of()
4982+
attachTopicName(currentMap.deletingTopics())
49834983
)),
49844984
null
49854985
);
@@ -5025,7 +5025,7 @@ public CoordinatorResult<Void, CoordinatorRecord> uninitializeShareGroupState(
50255025
groupId,
50265026
attachTopicName(finalInitializingTopics),
50275027
attachTopicName(info.initializedTopics()),
5028-
Map.of()
5028+
attachTopicName(info.deletingTopics())
50295029
)
50305030
),
50315031
null
@@ -5057,6 +5057,13 @@ public List<InitializeShareGroupStateParameters> reconcileShareGroupStateInitial
50575057
return requests;
50585058
}
50595059

5060+
private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
5061+
TopicsImage topicsImage = metadataImage.topics();
5062+
return topicIds.stream()
5063+
.map(topicId -> Map.entry(topicId, topicsImage.getTopic(topicId).name()))
5064+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
5065+
}
5066+
50605067
private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) {
50615068
TopicsImage topicsImage = metadataImage.topics();
50625069
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
@@ -8142,39 +8149,49 @@ public void createGroupTombstoneRecords(
81428149
/**
81438150
* Returns an optional of delete share group request object to be used with the persister.
81448151
* Empty if no subscribed topics or if the share group is empty.
8145-
* @param shareGroup - A share group
8152+
* @param shareGroupId Share group id
8153+
* @param records List of coordinator records to append to
81468154
* @return Optional of object representing the share group state delete request.
81478155
*/
8148-
public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
8149-
TopicsImage topicsImage = metadataImage.topics();
8150-
Set<String> subscribedTopics = shareGroup.subscribedTopicNames().keySet();
8151-
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(subscribedTopics.size());
8152-
8153-
for (String topic : subscribedTopics) {
8154-
TopicImage topicImage = topicsImage.getTopic(topic);
8155-
topicDataList.add(
8156-
new TopicData<>(
8157-
topicImage.id(),
8158-
topicImage.partitions().keySet().stream()
8159-
.map(PartitionFactory::newPartitionIdData)
8160-
.toList()
8161-
)
8162-
);
8156+
public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(String shareGroupId, List<CoordinatorRecord> records) {
8157+
if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) {
8158+
return Optional.empty();
81638159
}
81648160

8165-
if (topicDataList.isEmpty()) {
8161+
Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps(
8162+
shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(),
8163+
shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
8164+
);
8165+
8166+
if (deleteCandidates.isEmpty()) {
81668167
return Optional.empty();
81678168
}
81688169

8169-
return Optional.of(
8170-
new DeleteShareGroupStateParameters.Builder()
8171-
.setGroupTopicPartitionData(
8172-
new GroupTopicPartitionData.Builder<PartitionIdData>()
8173-
.setGroupId(shareGroup.groupId())
8174-
.setTopicsData(topicDataList)
8175-
.build()
8176-
)
8177-
.build()
8170+
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(deleteCandidates.size());
8171+
8172+
for (Map.Entry<Uuid, Set<Integer>> entry : deleteCandidates.entrySet()) {
8173+
topicDataList.add(new TopicData<>(
8174+
entry.getKey(),
8175+
entry.getValue().stream()
8176+
.map(PartitionFactory::newPartitionIdData)
8177+
.toList()
8178+
));
8179+
}
8180+
8181+
// Remove all initializing and initialized topic info from record and add deleting.
8182+
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
8183+
shareGroupId,
8184+
Map.of(),
8185+
Map.of(),
8186+
attachTopicName(deleteCandidates.keySet())
8187+
));
8188+
8189+
return Optional.of(new DeleteShareGroupStateParameters.Builder()
8190+
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
8191+
.setGroupId(shareGroupId)
8192+
.setTopicsData(topicDataList)
8193+
.build())
8194+
.build()
81788195
);
81798196
}
81808197

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java

+1
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
236236
);
237237

238238
records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId()));
239+
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId()));
239240
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId()));
240241
}
241242

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
3232
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
3333
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
34+
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
3435
import org.apache.kafka.server.common.ApiMessageAndVersion;
3536

3637
import org.opentest4j.AssertionFailedError;
@@ -60,7 +61,8 @@ public class Assertions {
6061
ConsumerGroupPartitionMetadataValue.class, Assertions::assertConsumerGroupPartitionMetadataValue,
6162
GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
6263
ConsumerGroupTargetAssignmentMemberValue.class, Assertions::assertConsumerGroupTargetAssignmentMemberValue,
63-
ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue
64+
ShareGroupPartitionMetadataValue.class, Assertions::assertShareGroupPartitionMetadataValue,
65+
ShareGroupStatePartitionMetadataValue.class, Assertions::assertShareGroupStatePartitionMetadataValue
6466
);
6567

6668
public static void assertResponseEquals(
@@ -285,6 +287,33 @@ private static void assertShareGroupPartitionMetadataValue(
285287
assertEquals(expected, actual);
286288
}
287289

290+
private static void assertShareGroupStatePartitionMetadataValue(
291+
ApiMessage exp,
292+
ApiMessage act
293+
) {
294+
ShareGroupStatePartitionMetadataValue expected = (ShareGroupStatePartitionMetadataValue) exp.duplicate();
295+
ShareGroupStatePartitionMetadataValue actual = (ShareGroupStatePartitionMetadataValue) act.duplicate();
296+
297+
Consumer<ShareGroupStatePartitionMetadataValue> normalize = message -> {
298+
message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
299+
message.initializedTopics().forEach(topic -> {
300+
topic.partitions().sort(Comparator.naturalOrder());
301+
});
302+
303+
message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
304+
message.initializingTopics().forEach(topic -> {
305+
topic.partitions().sort(Comparator.naturalOrder());
306+
});
307+
308+
message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId));
309+
};
310+
311+
normalize.accept(expected);
312+
normalize.accept(actual);
313+
314+
assertEquals(expected, actual);
315+
}
316+
288317
private static void assertGroupMetadataValue(
289318
ApiMessage exp,
290319
ApiMessage act

0 commit comments

Comments
 (0)