Skip to content

Commit a5325e0

Browse files
authored
KAFKA-17431: Support invalid static configs for KRaft so long as dynamic configs are valid (apache#18949)
During broker startup, attempt to read dynamic configurations from latest local snapshot on disk. This will avoid most situations where the static configuration is not sufficient to start up, but the dynamic configuration would have been. The PR includes an integration test. Reviewers: Colin P. McCabe <[email protected]>
1 parent 3ed5902 commit a5325e0

File tree

4 files changed

+122
-30
lines changed

4 files changed

+122
-30
lines changed

core/src/main/scala/kafka/raft/RaftManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,15 @@ trait RaftManager[T] {
104104
def replicatedLog: ReplicatedLog
105105

106106
def voterNode(id: Int, listener: ListenerName): Option[Node]
107+
108+
def recordSerde: RecordSerde[T]
107109
}
108110

109111
class KafkaRaftManager[T](
110112
clusterId: String,
111113
config: KafkaConfig,
112114
metadataLogDirUuid: Uuid,
113-
recordSerde: RecordSerde[T],
115+
serde: RecordSerde[T],
114116
topicPartition: TopicPartition,
115117
topicId: Uuid,
116118
time: Time,
@@ -298,4 +300,6 @@ class KafkaRaftManager[T](
298300
override def voterNode(id: Int, listener: ListenerName): Option[Node] = {
299301
client.voterNode(id, listener).toScala
300302
}
303+
304+
override def recordSerde: RecordSerde[T] = serde
301305
}

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,10 @@ class BrokerServer(
194194
info("Starting broker")
195195

196196
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
197+
197198
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
199+
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")
200+
DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers)
198201

199202
/* start scheduler */
200203
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -203,8 +206,6 @@ class BrokerServer(
203206
/* register broker metrics */
204207
brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
205208

206-
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")
207-
208209
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
209210

210211
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,28 @@ import java.util.concurrent.CopyOnWriteArrayList
2323
import java.util.concurrent.locks.ReentrantReadWriteLock
2424
import kafka.log.{LogCleaner, LogManager}
2525
import kafka.network.{DataPlaneAcceptor, SocketServer}
26+
import kafka.raft.KafkaRaftManager
2627
import kafka.server.DynamicBrokerConfig._
2728
import kafka.utils.{CoreUtils, Logging}
2829
import org.apache.kafka.common.Reconfigurable
2930
import org.apache.kafka.network.EndPoint
3031
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
31-
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
32+
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, SaslConfigs, SslConfigs}
33+
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
3234
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
3335
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
3436
import org.apache.kafka.common.security.authenticator.LoginManager
35-
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
37+
import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
3638
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3739
import org.apache.kafka.network.SocketServerConfigs
40+
import org.apache.kafka.raft.KafkaRaftClient
3841
import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
42+
import org.apache.kafka.server.common.ApiMessageAndVersion
3943
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
4044
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4145
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
4246
import org.apache.kafka.server.telemetry.ClientTelemetry
47+
import org.apache.kafka.snapshot.RecordsSnapshotReader
4348
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
4449

4550
import scala.collection._
@@ -185,6 +190,52 @@ object DynamicBrokerConfig {
185190
}
186191
props
187192
}
193+
194+
private[server] def readDynamicBrokerConfigsFromSnapshot(
195+
raftManager: KafkaRaftManager[ApiMessageAndVersion],
196+
config: KafkaConfig,
197+
quotaManagers: QuotaFactory.QuotaManagers
198+
): Unit = {
199+
def putOrRemoveIfNull(props: Properties, key: String, value: String): Unit = {
200+
if (value == null) {
201+
props.remove(key)
202+
} else {
203+
props.put(key, value)
204+
}
205+
}
206+
raftManager.replicatedLog.latestSnapshotId().ifPresent(latestSnapshotId => {
207+
raftManager.replicatedLog.readSnapshot(latestSnapshotId).ifPresent(rawSnapshotReader => {
208+
val reader = RecordsSnapshotReader.of(
209+
rawSnapshotReader,
210+
raftManager.recordSerde,
211+
BufferSupplier.create(),
212+
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
213+
true
214+
)
215+
val dynamicPerBrokerConfigs = new Properties()
216+
val dynamicDefaultConfigs = new Properties()
217+
while (reader.hasNext) {
218+
val batch = reader.next()
219+
batch.forEach(record => {
220+
if (record.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id) {
221+
val configRecord = record.message().asInstanceOf[ConfigRecord]
222+
if (DynamicBrokerConfig.AllDynamicConfigs.contains(configRecord.name()) &&
223+
configRecord.resourceType() == ConfigResource.Type.BROKER.id()) {
224+
if (configRecord.resourceName().isEmpty) {
225+
putOrRemoveIfNull(dynamicDefaultConfigs, configRecord.name(), configRecord.value())
226+
} else if (configRecord.resourceName() == config.brokerId.toString) {
227+
putOrRemoveIfNull(dynamicPerBrokerConfigs, configRecord.name(), configRecord.value())
228+
}
229+
}
230+
}
231+
})
232+
}
233+
val configHandler = new BrokerConfigHandler(config, quotaManagers)
234+
configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
235+
configHandler.processConfigChanges(config.brokerId.toString, dynamicPerBrokerConfigs)
236+
})
237+
})
238+
}
188239
}
189240

190241
class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
5656
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
5757
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
5858
import org.apache.kafka.network.SocketServerConfigs
59-
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
59+
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
6060
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
6161
import org.apache.kafka.server.record.BrokerCompressionType
6262
import org.apache.kafka.server.util.ShutdownableThread
@@ -116,30 +116,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
116116
clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new ones can be tested
117117

118118
(0 until numServers).foreach { brokerId =>
119-
120-
val props = TestUtils.createBrokerConfig(brokerId)
121-
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
122-
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
123-
// Ensure that we can support multiple listeners per security protocol and multiple security protocols
124-
props.put(SocketServerConfigs.LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
125-
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
126-
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, SecureInternal)
127-
props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested")
128-
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN")
129-
props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
130-
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low value to test log rolling on config update
131-
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
132-
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
133-
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
134-
135-
props ++= sslProperties1
136-
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal))
137-
138-
// Set invalid top-level properties to ensure that listener config is used
139-
// Don't set any dynamic configs here since they get overridden in tests
140-
props ++= invalidSslProperties
141-
props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS)
142-
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
119+
val props = defaultStaticConfig(brokerId)
143120

144121
val kafkaConfig = KafkaConfig.fromProps(props)
145122

@@ -157,6 +134,33 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
157134
TestMetricsReporter.testReporters.clear()
158135
}
159136

137+
def defaultStaticConfig(brokerId: Int): Properties = {
138+
val props = TestUtils.createBrokerConfig(brokerId)
139+
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
140+
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
141+
// Ensure that we can support multiple listeners per security protocol and multiple security protocols
142+
props.put(SocketServerConfigs.LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
143+
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"PLAINTEXT:PLAINTEXT, $SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
144+
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, SecureInternal)
145+
props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested")
146+
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN")
147+
props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
148+
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low value to test log rolling on config update
149+
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
150+
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
151+
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
152+
153+
props ++= sslProperties1
154+
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal))
155+
156+
// Set invalid top-level properties to ensure that listener config is used
157+
// Don't set any dynamic configs here since they get overridden in tests
158+
props ++= invalidSslProperties
159+
props ++= securityProps(invalidSslProperties, KEYSTORE_PROPS)
160+
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
161+
props
162+
}
163+
160164
@AfterEach
161165
override def tearDown(): Unit = {
162166
clientThreads.foreach(_.interrupt())
@@ -1090,6 +1094,38 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
10901094
verifyConfiguration(true)
10911095
}
10921096

1097+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
1098+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
1099+
def testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(quorum: String, groupProtocol: String): Unit = {
1100+
// modify snapshot interval config to explicitly take snapshot on a broker with valid dynamic configs
1101+
val props = defaultStaticConfig(numServers)
1102+
props.put(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, "10000")
1103+
1104+
val kafkaConfig = KafkaConfig.fromProps(props)
1105+
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
1106+
servers += newBroker
1107+
1108+
alterSslKeystoreUsingConfigCommand(sslProperties1, listenerPrefix(SecureExternal))
1109+
1110+
TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
1111+
1112+
TestUtils.waitUntilTrue(
1113+
() => newBroker.raftManager.replicatedLog.latestSnapshotId().isPresent,
1114+
"metadata snapshot not present on broker",
1115+
30000L
1116+
)
1117+
1118+
// shutdown broker and attempt to restart it after invalidating its static configurations
1119+
newBroker.shutdown()
1120+
newBroker.awaitShutdown()
1121+
1122+
val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
1123+
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, KEYSTORE_PROPS, listenerPrefix(SecureExternal)))
1124+
newBroker.config.updateCurrentConfig(KafkaConfig.fromProps(invalidStaticConfigs))
1125+
1126+
newBroker.startup()
1127+
}
1128+
10931129
private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
10941130
TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment")
10951131
consumer.assignment.forEach(tp => consumer.position(tp))

0 commit comments

Comments
 (0)