Skip to content

Commit 1df4a42

Browse files
authored
KAFKA-18916; Resolved regular expressions must update the group by topics data structure (apache#19088)
When regular expressions are resolved, they do not update the group by topics data structure. Hence, topic changes (e.g. deletion) do not trigger a rebalance of the group. Reviewers: Lucas Brutschy <[email protected]>
1 parent 101e15b commit 1df4a42

File tree

3 files changed

+58
-11
lines changed

3 files changed

+58
-11
lines changed

core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
2020
import kafka.utils.TestUtils
2121
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
2222
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
23-
import org.apache.kafka.common.Uuid
23+
import org.apache.kafka.common.{TopicCollection, Uuid}
2424
import org.apache.kafka.common.config.ConfigResource
2525
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
2626
import org.apache.kafka.common.protocol.Errors
@@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
174174
)
175175

176176
// Heartbeat request to join the group. Note that the member subscribes
177-
// to an nonexistent topic.
177+
// to a nonexistent topic.
178178
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
179179
new ConsumerGroupHeartbeatRequestData()
180180
.setGroupId("grp")
@@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
214214
).build()
215215

216216
// This is the expected assignment.
217-
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
217+
var expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
218218
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
219219
.setTopicId(topicId)
220220
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
@@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
230230
// Verify the response.
231231
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
232232
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
233+
234+
// Delete the topic.
235+
admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get
236+
237+
// Prepare the next heartbeat.
238+
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
239+
new ConsumerGroupHeartbeatRequestData()
240+
.setGroupId("grp")
241+
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
242+
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
243+
).build()
244+
245+
// This is the expected assignment.
246+
expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
247+
248+
// Heartbeats until the partitions are revoked.
249+
consumerGroupHeartbeatResponse = null
250+
TestUtils.waitUntilTrue(() => {
251+
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
252+
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
253+
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
254+
}, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.")
255+
256+
// Verify the response.
257+
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
258+
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
233259
} finally {
234260
admin.close()
235261
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4308,9 +4308,18 @@ public void replay(
43084308
String groupId = key.groupId();
43094309
String regex = key.regularExpression();
43104310

4311+
ConsumerGroup consumerGroup;
4312+
try {
4313+
consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
4314+
} catch (GroupIdNotFoundException ex) {
4315+
// If the group does not exist and a tombstone is replayed, we can ignore it.
4316+
return;
4317+
}
4318+
4319+
Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet());
4320+
43114321
if (value != null) {
4312-
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true);
4313-
group.updateResolvedRegularExpression(
4322+
consumerGroup.updateResolvedRegularExpression(
43144323
regex,
43154324
new ResolvedRegularExpression(
43164325
new HashSet<>(value.topics()),
@@ -4319,13 +4328,10 @@ public void replay(
43194328
)
43204329
);
43214330
} else {
4322-
try {
4323-
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
4324-
group.removeResolvedRegularExpression(regex);
4325-
} catch (GroupIdNotFoundException ex) {
4326-
// If the group does not exist, we can ignore the tombstone.
4327-
}
4331+
consumerGroup.removeResolvedRegularExpression(regex);
43284332
}
4333+
4334+
updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet());
43294335
}
43304336

43314337
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16076,6 +16076,11 @@ public void testReplayConsumerGroupRegularExpression() {
1607616076
Optional.of(resolvedRegularExpression),
1607716077
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
1607816078
);
16079+
16080+
assertEquals(
16081+
Set.of("foo"),
16082+
context.groupMetadataManager.groupsSubscribedToTopic("abc")
16083+
);
1607916084
}
1608016085

1608116086
@Test
@@ -16101,6 +16106,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() {
1610116106
resolvedRegularExpression
1610216107
));
1610316108

16109+
assertEquals(
16110+
Set.of("foo"),
16111+
context.groupMetadataManager.groupsSubscribedToTopic("abc")
16112+
);
16113+
1610416114
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
1610516115
"foo",
1610616116
"abc*"
@@ -16110,6 +16120,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() {
1611016120
Optional.empty(),
1611116121
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
1611216122
);
16123+
16124+
assertEquals(
16125+
Set.of(),
16126+
context.groupMetadataManager.groupsSubscribedToTopic("abc")
16127+
);
1611316128
}
1611416129

1611516130
@Test

0 commit comments

Comments
 (0)