Skip to content

Commit 6e446f0

Browse files
authored
KAFKA-19047: Allow quickly re-registering brokers that are in controlled shutdown (apache#19296)
Allow re-registration of brokers with active sessions if the previous broker registration was in controlled shutdown. Reviewers: Colin P. McCabe <[email protected]>, Reviewers: José Armando García Sancio <[email protected]>, David Mao <[email protected]>
1 parent 434b0d3 commit 6e446f0

File tree

2 files changed

+130
-3
lines changed

2 files changed

+130
-3
lines changed

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,10 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
353353
if (existing != null) {
354354
prevIncarnationId = existing.incarnationId();
355355
storedBrokerEpoch = existing.epoch();
356-
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
356+
if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && !existing.inControlledShutdown()) {
357357
if (!request.incarnationId().equals(prevIncarnationId)) {
358-
throw new DuplicateBrokerRegistrationException("Another broker is " +
359-
"registered with that broker id.");
358+
throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id. If the broker " +
359+
"was recently restarted this should self-resolve once the heartbeat manager expires the broker's session.");
360360
}
361361
}
362362
}

metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.DirectoryId;
2121
import org.apache.kafka.common.Endpoint;
2222
import org.apache.kafka.common.Uuid;
23+
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
2324
import org.apache.kafka.common.errors.InconsistentClusterIdException;
2425
import org.apache.kafka.common.errors.InvalidRegistrationException;
2526
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -70,6 +71,7 @@
7071
import java.util.Optional;
7172
import java.util.OptionalLong;
7273
import java.util.Set;
74+
import java.util.concurrent.TimeUnit;
7375
import java.util.stream.Stream;
7476

7577
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -978,6 +980,131 @@ public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
978980
contactTime(new BrokerIdAndEpoch(2, 100)));
979981
}
980982

983+
@Test
984+
public void testDuplicateBrokerRegistrationWithActiveOldBroker() {
985+
// active here means brokerHeartbeatManager last recorded the broker as unfenced and not in controlled shutdown
986+
long brokerSessionTimeoutMs = 1000;
987+
MockTime time = new MockTime(0L, 20L, 1000L);
988+
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(
989+
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
990+
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
991+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
992+
setFeatureControlManager(createFeatureControlManager()).
993+
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
994+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
995+
setTime(time).
996+
build();
997+
clusterControl.replay(new RegisterBrokerRecord().
998+
setBrokerEpoch(100).
999+
setBrokerId(0).
1000+
setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
1001+
setFenced(false), 10002);
1002+
clusterControl.activate();
1003+
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
1004+
contactTime(new BrokerIdAndEpoch(0, 100)));
1005+
1006+
// while session is still valid for old broker, duplicate requests should fail
1007+
time.sleep(brokerSessionTimeoutMs / 2);
1008+
assertThrows(DuplicateBrokerRegistrationException.class, () ->
1009+
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1010+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1011+
setBrokerId(0).
1012+
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
1013+
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1014+
Set.of(new BrokerRegistrationRequestData.Feature().
1015+
setName(MetadataVersion.FEATURE_NAME).
1016+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1017+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1018+
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
1019+
101L,
1020+
finalizedFeatures,
1021+
false));
1022+
1023+
// if session expires for broker, even if the broker was active the new registration will succeed
1024+
time.sleep(brokerSessionTimeoutMs);
1025+
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1026+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1027+
setBrokerId(0).
1028+
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
1029+
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1030+
Set.of(new BrokerRegistrationRequestData.Feature().
1031+
setName(MetadataVersion.FEATURE_NAME).
1032+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1033+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1034+
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
1035+
101L,
1036+
finalizedFeatures,
1037+
false);
1038+
}
1039+
1040+
@Test
1041+
public void testDuplicateBrokerRegistrationWithInactiveBroker() {
1042+
// inactive here means brokerHeartbeatManager last recorded the broker as fenced or in controlled shutdown
1043+
long brokerSessionTimeoutMs = 1000;
1044+
MockTime time = new MockTime(0L, 20L, 1000L);
1045+
FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures(
1046+
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
1047+
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
1048+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1049+
setFeatureControlManager(createFeatureControlManager()).
1050+
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }).
1051+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
1052+
setTime(time).
1053+
build();
1054+
// first broker is fenced
1055+
clusterControl.replay(new RegisterBrokerRecord().
1056+
setBrokerEpoch(100).
1057+
setBrokerId(0).
1058+
setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
1059+
setFenced(true).
1060+
setInControlledShutdown(false), 10002);
1061+
// second broker is in controlled shutdown
1062+
clusterControl.replay(new RegisterBrokerRecord().
1063+
setBrokerEpoch(200).
1064+
setBrokerId(1).
1065+
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
1066+
setFenced(false).
1067+
setInControlledShutdown(true), 20002);
1068+
clusterControl.activate();
1069+
clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(1, 20002);
1070+
1071+
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
1072+
contactTime(new BrokerIdAndEpoch(0, 100)));
1073+
assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker().
1074+
contactTime(new BrokerIdAndEpoch(1, 200)));
1075+
1076+
time.sleep(brokerSessionTimeoutMs / 2);
1077+
assertThrows(DuplicateBrokerRegistrationException.class, () ->
1078+
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1079+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1080+
setBrokerId(0).
1081+
setLogDirs(List.of(Uuid.fromString("yJGxmjfbQZSVFAlNM3uXZg"))).
1082+
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1083+
Set.of(new BrokerRegistrationRequestData.Feature().
1084+
setName(MetadataVersion.FEATURE_NAME).
1085+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1086+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1087+
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
1088+
101L,
1089+
finalizedFeatures,
1090+
false));
1091+
// new registration should succeed immediatelly only if the broker is in controlled shutdown,
1092+
// even if the last heartbeat was within the session timeout
1093+
clusterControl.registerBroker(new BrokerRegistrationRequestData().
1094+
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
1095+
setBrokerId(1).
1096+
setLogDirs(List.of(Uuid.fromString("b66ybsWIQoygs01vdjH07A"))).
1097+
setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
1098+
Set.of(new BrokerRegistrationRequestData.Feature().
1099+
setName(MetadataVersion.FEATURE_NAME).
1100+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
1101+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
1102+
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")),
1103+
201L,
1104+
finalizedFeatures,
1105+
false);
1106+
}
1107+
9811108
private FeatureControlManager createFeatureControlManager() {
9821109
FeatureControlManager featureControlManager = new FeatureControlManager.Builder().build();
9831110
featureControlManager.replay(new FeatureLevelRecord().

0 commit comments

Comments
 (0)