Skip to content

Commit c465abc

Browse files
authored
KAFKA-19130: Do not add fenced brokers to BrokerRegistrationTracker on startup (apache#19454)
When the controller starts up (or becomes active after being inactive), we add all of the registered brokers to BrokerRegistrationTracker so that they will not be accidentally fenced the next time we are looking for a broker to fence. We do this because the state in BrokerRegistrationTracker is "soft state" (it doesn't appear in the metadata log), and the newly active controller starts off with no soft state. (Its soft state will be populated by the brokers sending heartbeat requests to it over time.) In the case of fenced brokers, we are not worried about accidentally fencing the broker due to it being missing from BrokerRegistrationTracker for a while (it's already fenced). Therefore, it should be reasonable to just not add fenced brokers to the tracker initially. One case where this change will have a positive impact is for people running single-node demonstration clusters in combined KRaft mode. In that case, when the single-node cluster is taken down and restarted, it currently will have to wait about 9 seconds for the broker to come up and re-register. With this change, the broker should be able to re-register immediately (assuming the previous shutdown happened cleanly through controller shutdown.) One possible negative impact is that if there is a controller failover, it will open a small window where a broker with the same ID as a fenced broker could re-register. However, our detection of duplicate broker IDs is best-effort (and duplicate broker IDs are an administrative mistake), so this downside seems acceptable. Reviewers: Alyssa Huang <[email protected]>, José Armando García Sancio <[email protected]>
1 parent 23e7158 commit c465abc

File tree

2 files changed

+28
-26
lines changed

2 files changed

+28
-26
lines changed

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,10 @@ public void activate() {
309309
long nowNs = time.nanoseconds();
310310
for (BrokerRegistration registration : brokerRegistrations.values()) {
311311
heartbeatManager.register(registration.id(), registration.fenced());
312-
heartbeatManager.tracker().updateContactTime(
313-
new BrokerIdAndEpoch(registration.id(), registration.epoch()), nowNs);
312+
if (!registration.fenced()) {
313+
heartbeatManager.tracker().updateContactTime(
314+
new BrokerIdAndEpoch(registration.id(), registration.epoch()), nowNs);
315+
}
314316
}
315317
}
316318

@@ -353,9 +355,9 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
353355
if (existing != null) {
354356
prevIncarnationId = existing.incarnationId();
355357
storedBrokerEpoch = existing.epoch();
356-
if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && !existing.inControlledShutdown()) {
358+
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
357359
if (!request.incarnationId().equals(prevIncarnationId)) {
358-
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
360+
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
359361
"was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");
360362
}
361363
}

metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -968,9 +968,10 @@ public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
968968
clusterControl.replay(new RegisterBrokerRecord().
969969
setBrokerEpoch(123).
970970
setBrokerId(1).
971+
setFenced(false).
971972
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))), 10005);
972973
clusterControl.activate();
973-
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
974+
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
974975
contactTime(new BrokerIdAndEpoch(0, 100)));
975976
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
976977
contactTime(new BrokerIdAndEpoch(1, 123)));
@@ -1068,41 +1069,40 @@ public void testDuplicateBrokerRegistrationWithInactiveBroker() {
10681069
clusterControl.activate();
10691070
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
10701071

1071-
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
1072+
assertEquals(OptionalLong.empty(), clusterControl.heartbeatManager().tracker().
10721073
contactTime(new BrokerIdAndEpoch(0, 100)));
10731074
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
10741075
contactTime(new BrokerIdAndEpoch(1, 200)));
10751076

10761077
time.sleep(brokerSessionTimeoutMs / 2);
1077-
assertThrows(DuplicateBrokerRegistrationException.class, () ->
1078-
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1079-
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1080-
setBrokerId(0).
1081-
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
1082-
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1083-
Set.of(new BrokerRegistrationRequestData.Feature().
1084-
setName(MetadataVersion.FEATURE_NAME).
1085-
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1086-
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1087-
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
1088-
101L,
1089-
finalizedFeatures,
1090-
false));
1091-
// new registration should succeed immediatelly only if the broker is in controlled shutdown,
1092-
// even if the last heartbeat was within the session timeout
10931078
clusterControl.registerBroker(new BrokerRegistrationRequestData().
10941079
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1095-
setBrokerId(1).
1096-
setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
1080+
setBrokerId(0).
1081+
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
10971082
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
10981083
Set.of(new BrokerRegistrationRequestData.Feature().
10991084
setName(MetadataVersion.FEATURE_NAME).
11001085
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
11011086
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1102-
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
1103-
201L,
1087+
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
1088+
101L,
11041089
finalizedFeatures,
11051090
false);
1091+
assertThrows(DuplicateBrokerRegistrationException.class, () -> {
1092+
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1093+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1094+
setBrokerId(1).
1095+
setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
1096+
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1097+
Set.of(new BrokerRegistrationRequestData.Feature().
1098+
setName(MetadataVersion.FEATURE_NAME).
1099+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1100+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1101+
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
1102+
201L,
1103+
finalizedFeatures,
1104+
false);
1105+
});
11061106
}
11071107

11081108
private FeatureControlManager createFeatureControlManager() {

0 commit comments

Comments
 (0)