-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-17747: [7/N] Add integration test for rack aware assignment #19856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Signed-off-by: PoAn Yang <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the PR. I left a few comments.
// RackId - set if present | ||
membershipManager.rackId().ifPresent(data::setRackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it should only be set if it did not change since the last request or if we send a full request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks.
@@ -142,7 +142,7 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup(String gro | |||
|
|||
private ConsumerMembershipManager createMembershipManager(String groupInstanceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's extend unit tests to cover the rack field.
// The WorkerGroupMember in connect module also uses this class, but there is no client.rack in DistributedConfig. | ||
// Ignore the rackId in that case to avoid ConfigException. | ||
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string. | ||
// Skip empty rackId to avoid InvalidRequestException. | ||
if (config.values().containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG) && !config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty()) { | ||
this.rackId = Optional.ofNullable(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG)); | ||
} else { | ||
this.rackId = Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should we only do this if the protocolType is CONSUMER?
- It may be simpler to get the config in a local variable and to set this.rack if non-null and non-empty;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change to do this when protocol type is CONSUMER or SHARE, because they all support the field.
@@ -216,6 +222,92 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception { | |||
} | |||
} | |||
|
|||
@ClusterTest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we could add another test which verifies that a rebalance is triggered when the racks of a partition has changed. Have you considered it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added it to the same case. It uses alterPartitionReassignments
to change partitions to different brokers and make sure consumers get new assignments.
Signed-off-by: PoAn Yang <[email protected]>
RackAwareAssignor
. It usesracksForPartition
to check the rackid of a partition and assign it to a member which has the same rack id.
ConsumerIntegrationTest#testRackAwareAssignment
to checkracksForPartition
works correctly.