Skip to content

Commit 99f871a

Browse files
MINOR: Consolidated message formatter for share group records (apache#19435)
Create a single formatter for use with `kafka-console-consumer.sh` that formats all record types for share groups on the `__consumer_offsets` topic.
1 parent 34a87d3 commit 99f871a

File tree

3 files changed

+265
-5
lines changed

3 files changed

+265
-5
lines changed
Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,33 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
18-
package org.apache.kafka.tools.consumer.group.share;
17+
package org.apache.kafka.tools.consumer;
1918

2019
import org.apache.kafka.common.protocol.ApiMessage;
2120
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
2221
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
2322
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
24-
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;
2523

2624
import com.fasterxml.jackson.databind.JsonNode;
2725

2826
import java.util.Set;
2927

30-
public class ShareGroupStatePartitionMetadataFormatter extends CoordinatorRecordMessageFormatter {
28+
/**
29+
* Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
30+
* Formats the records on the __consumer_offsets topic which pertain to share groups.
31+
*/
32+
public class ShareGroupMessageFormatter extends CoordinatorRecordMessageFormatter {
3133
private static final Set<Short> ALLOWED_RECORDS = Set.of(
34+
CoordinatorRecordType.SHARE_GROUP_PARTITION_METADATA.id(),
35+
CoordinatorRecordType.SHARE_GROUP_MEMBER_METADATA.id(),
36+
CoordinatorRecordType.SHARE_GROUP_METADATA.id(),
37+
CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_METADATA.id(),
38+
CoordinatorRecordType.SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER.id(),
39+
CoordinatorRecordType.SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT.id(),
3240
CoordinatorRecordType.SHARE_GROUP_STATE_PARTITION_METADATA.id()
3341
);
3442

35-
public ShareGroupStatePartitionMetadataFormatter() {
43+
public ShareGroupMessageFormatter() {
3644
super(new GroupCoordinatorRecordSerde());
3745
}
3846

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.tools.consumer;
18+
19+
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.common.protocol.MessageUtil;
21+
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
22+
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
23+
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
24+
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
25+
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
26+
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
27+
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
28+
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
29+
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
30+
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
31+
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
32+
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
33+
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
34+
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
35+
36+
import org.junit.jupiter.params.provider.Arguments;
37+
38+
import java.util.List;
39+
import java.util.stream.Stream;
40+
41+
public class ShareGroupMessageFormatterTest extends CoordinatorRecordMessageFormatterTest {
42+
43+
private static final ShareGroupPartitionMetadataKey SHARE_GROUP_PARTITION_METADATA_KEY = new ShareGroupPartitionMetadataKey()
44+
.setGroupId("group-id");
45+
private static final ShareGroupPartitionMetadataValue SHARE_GROUP_PARTITION_METADATA_VALUE = new ShareGroupPartitionMetadataValue()
46+
.setTopics(List.of(new ShareGroupPartitionMetadataValue.TopicMetadata()
47+
.setTopicId(Uuid.ONE_UUID)
48+
.setTopicName("topic")
49+
.setNumPartitions(1)
50+
.setPartitionMetadata(List.of()))
51+
);
52+
private static final ShareGroupMemberMetadataKey SHARE_GROUP_MEMBER_METADATA_KEY = new ShareGroupMemberMetadataKey()
53+
.setGroupId("group-id")
54+
.setMemberId("member-id");
55+
private static final ShareGroupMemberMetadataValue SHARE_GROUP_MEMBER_METADATA_VALUE = new ShareGroupMemberMetadataValue()
56+
.setRackId("rack-a")
57+
.setClientId("client-id")
58+
.setClientHost("1.2.3.4")
59+
.setSubscribedTopicNames(List.of("topic"));
60+
private static final ShareGroupMetadataKey SHARE_GROUP_METADATA_KEY = new ShareGroupMetadataKey()
61+
.setGroupId("group-id");
62+
private static final ShareGroupMetadataValue SHARE_GROUP_METADATA_VALUE = new ShareGroupMetadataValue()
63+
.setEpoch(1);
64+
private static final ShareGroupTargetAssignmentMetadataKey SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY = new ShareGroupTargetAssignmentMetadataKey()
65+
.setGroupId("group-id");
66+
private static final ShareGroupTargetAssignmentMetadataValue SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_VALUE = new ShareGroupTargetAssignmentMetadataValue()
67+
.setAssignmentEpoch(1);
68+
private static final ShareGroupTargetAssignmentMemberKey SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER_KEY = new ShareGroupTargetAssignmentMemberKey()
69+
.setGroupId("group-id")
70+
.setMemberId("member-id");
71+
private static final ShareGroupTargetAssignmentMemberValue SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER_VALUE = new ShareGroupTargetAssignmentMemberValue()
72+
.setTopicPartitions(List.of(new ShareGroupTargetAssignmentMemberValue.TopicPartition()
73+
.setTopicId(Uuid.ONE_UUID)
74+
.setPartitions(List.of(0, 1)))
75+
);
76+
private static final ShareGroupCurrentMemberAssignmentKey SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT_KEY = new ShareGroupCurrentMemberAssignmentKey()
77+
.setGroupId("group-id")
78+
.setMemberId("member-id");
79+
private static final ShareGroupCurrentMemberAssignmentValue SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT_VALUE = new ShareGroupCurrentMemberAssignmentValue()
80+
.setMemberEpoch(1)
81+
.setPreviousMemberEpoch(0)
82+
.setState((byte) 0)
83+
.setAssignedPartitions(List.of(new ShareGroupCurrentMemberAssignmentValue.TopicPartitions()
84+
.setTopicId(Uuid.ONE_UUID)
85+
.setPartitions(List.of(0, 1)))
86+
);
87+
private static final ShareGroupStatePartitionMetadataKey SHARE_GROUP_STATE_PARTITION_METADATA_KEY = new ShareGroupStatePartitionMetadataKey()
88+
.setGroupId("group-id");
89+
private static final ShareGroupStatePartitionMetadataValue SHARE_GROUP_STATE_PARTITION_METADATA_VALUE = new ShareGroupStatePartitionMetadataValue()
90+
.setInitializingTopics(List.of(new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
91+
.setTopicId(Uuid.ONE_UUID)
92+
.setTopicName("topic")
93+
.setPartitions(List.of(1)))
94+
)
95+
.setInitializedTopics(List.of(new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
96+
.setTopicId(Uuid.ONE_UUID)
97+
.setTopicName("topic")
98+
.setPartitions(List.of(0)))
99+
)
100+
.setDeletingTopics(List.of(new ShareGroupStatePartitionMetadataValue.TopicInfo()
101+
.setTopicId(Uuid.ONE_UUID)
102+
.setTopicName("topic"))
103+
);
104+
105+
@Override
106+
protected CoordinatorRecordMessageFormatter formatter() {
107+
return new ShareGroupMessageFormatter();
108+
}
109+
110+
@Override
111+
protected Stream<Arguments> parameters() {
112+
return Stream.of(
113+
Arguments.of(
114+
MessageUtil.toVersionPrefixedByteBuffer((short) 9, SHARE_GROUP_PARTITION_METADATA_KEY).array(),
115+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_PARTITION_METADATA_VALUE).array(),
116+
"""
117+
{"key":{"type":9,"data":{"groupId":"group-id"}},
118+
"value":{"version":0,
119+
"data":{"topics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
120+
"topicName":"topic",
121+
"numPartitions":1,
122+
"partitionMetadata":[]}]}}}
123+
"""
124+
),
125+
Arguments.of(
126+
MessageUtil.toVersionPrefixedByteBuffer((short) 9, SHARE_GROUP_PARTITION_METADATA_KEY).array(),
127+
null,
128+
"""
129+
{"key":{"type":9,"data":{"groupId":"group-id"}},"value":null}
130+
"""
131+
),
132+
Arguments.of(
133+
MessageUtil.toVersionPrefixedByteBuffer((short) 10, SHARE_GROUP_MEMBER_METADATA_KEY).array(),
134+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_MEMBER_METADATA_VALUE).array(),
135+
"""
136+
{"key":{"type":10,"data":{"groupId":"group-id","memberId":"member-id"}},
137+
"value":{"version":0,
138+
"data":{"rackId":"rack-a",
139+
"clientId":"client-id",
140+
"clientHost":"1.2.3.4",
141+
"subscribedTopicNames":["topic"]}}}
142+
"""
143+
),
144+
Arguments.of(
145+
MessageUtil.toVersionPrefixedByteBuffer((short) 10, SHARE_GROUP_MEMBER_METADATA_KEY).array(),
146+
null,
147+
"""
148+
{"key":{"type":10,"data":{"groupId":"group-id","memberId":"member-id"}},"value":null}
149+
"""
150+
),
151+
Arguments.of(
152+
MessageUtil.toVersionPrefixedByteBuffer((short) 11, SHARE_GROUP_METADATA_KEY).array(),
153+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_METADATA_VALUE).array(),
154+
"""
155+
{"key":{"type":11,"data":{"groupId":"group-id"}},
156+
"value":{"version":0,
157+
"data":{"epoch":1}}}
158+
"""
159+
),
160+
Arguments.of(
161+
MessageUtil.toVersionPrefixedByteBuffer((short) 11, SHARE_GROUP_METADATA_KEY).array(),
162+
null,
163+
"""
164+
{"key":{"type":11,"data":{"groupId":"group-id"}},"value":null}
165+
"""
166+
),
167+
Arguments.of(
168+
MessageUtil.toVersionPrefixedByteBuffer((short) 12, SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY).array(),
169+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_VALUE).array(),
170+
"""
171+
{"key":{"type":12,"data":{"groupId":"group-id"}},
172+
"value":{"version":0,
173+
"data":{"assignmentEpoch":1}}}
174+
"""
175+
),
176+
Arguments.of(
177+
MessageUtil.toVersionPrefixedByteBuffer((short) 12, SHARE_GROUP_TARGET_ASSIGNMENT_METADATA_KEY).array(),
178+
null,
179+
"""
180+
{"key":{"type":12,"data":{"groupId":"group-id"}},"value":null}
181+
"""
182+
),
183+
Arguments.of(
184+
MessageUtil.toVersionPrefixedByteBuffer((short) 13, SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER_KEY).array(),
185+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER_VALUE).array(),
186+
"""
187+
{"key":{"type":13,"data":{"groupId":"group-id","memberId":"member-id"}},
188+
"value":{"version":0,
189+
"data":{"topicPartitions":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
190+
"partitions":[0,1]}]}}}
191+
"""
192+
),
193+
Arguments.of(
194+
MessageUtil.toVersionPrefixedByteBuffer((short) 13, SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER_KEY).array(),
195+
null,
196+
"""
197+
{"key":{"type":13,"data":{"groupId":"group-id","memberId":"member-id"}},"value":null}
198+
"""
199+
),
200+
Arguments.of(
201+
MessageUtil.toVersionPrefixedByteBuffer((short) 14, SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT_KEY).array(),
202+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT_VALUE).array(),
203+
"""
204+
{"key":{"type":14,"data":{"groupId":"group-id","memberId":"member-id"}},
205+
"value":{"version":0,
206+
"data":{"memberEpoch":1,
207+
"previousMemberEpoch":0,
208+
"state":0,
209+
"assignedPartitions":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
210+
"partitions":[0,1]}]}}}
211+
"""
212+
),
213+
Arguments.of(
214+
MessageUtil.toVersionPrefixedByteBuffer((short) 14, SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT_KEY).array(),
215+
null,
216+
"""
217+
{"key":{"type":14,"data":{"groupId":"group-id","memberId":"member-id"}},"value":null}
218+
"""
219+
),
220+
Arguments.of(
221+
MessageUtil.toVersionPrefixedByteBuffer((short) 15, SHARE_GROUP_STATE_PARTITION_METADATA_KEY).array(),
222+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_GROUP_STATE_PARTITION_METADATA_VALUE).array(),
223+
"""
224+
{"key":{"type":15,"data":{"groupId":"group-id"}},
225+
"value":{"version":0,
226+
"data":{"initializingTopics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
227+
"topicName":"topic",
228+
"partitions":[1]}],
229+
"initializedTopics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
230+
"topicName":"topic",
231+
"partitions":[0]}],
232+
"deletingTopics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ",
233+
"topicName":"topic"}]}}}
234+
"""
235+
),
236+
Arguments.of(
237+
MessageUtil.toVersionPrefixedByteBuffer((short) 15, SHARE_GROUP_STATE_PARTITION_METADATA_KEY).array(),
238+
null,
239+
"""
240+
{"key":{"type":15,"data":{"groupId":"group-id"}},"value":null}
241+
"""
242+
)
243+
);
244+
}
245+
}

tools/src/test/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatterTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,13 @@ protected Stream<Arguments> parameters() {
159159
{"firstOffset":201,"lastOffset":210,"deliveryState":2,"deliveryCount":10}]}}}
160160
"""
161161
),
162+
Arguments.of(
163+
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_KEY).array(),
164+
null,
165+
"""
166+
{"key":{"type":0,"data":{"groupId":"gs1","topicId":"gtb2stGYRk-vWZ2zAozmoA","partition":0}},"value":null}
167+
"""
168+
),
162169
Arguments.of(
163170
MessageUtil.toVersionPrefixedByteBuffer((short) 1, SHARE_UPDATE_KEY).array(),
164171
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),

0 commit comments

Comments
 (0)