Skip to content

Commit 7bc84d6

Browse files
authored
KAFKA-12467: Implement QuorumController snapshot generation (apache#10366)
Implement controller-side snapshot generation.Implement QuorumController snapshot generation. Note that this PR does not handle KRaft integration, just the internal snapshot record generation and consumption logic. Reading a snapshot is relatively straightforward. When the QuorumController starts up, it loads the most recent snapshot. This is just a series of records that we replay, plus a log offset ("snapshot epoch") that we advance to. Writing a snapshot is more complex. There are several components: the SnapshotWriter which persists the snapshot, the SnapshotGenerator which manages writing each batch of records, and the SnapshotGeneratorManager which interfaces the preceding two classes with the event queue. Controller snapshots are done incrementally. In order to avoid blocking the controller thread for a long time, we pull a few record batches at a time from our record batch iterators. These iterators are implemented by controller manager classes such as ReplicationControlManager, ClusterControlManager, etc. Finally, this PR adds ControllerTestUtils#deepSortRecords and ControllerTestUtils#assertBatchIteratorContains, which make it easier to write unit tests. Since records are often constructed from unsorted data structures, it is often useful to sort them before comparing them. Reviewers: David Arthur <[email protected]>
1 parent 137491c commit 7bc84d6

26 files changed

+1394
-53
lines changed

checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@
268268
files="(ReplicationControlManager|ReplicationControlManagerTest).java"/>
269269
<suppress checks="ClassFanOutComplexity"
270270
files="(QuorumController|ReplicationControlManager).java"/>
271+
<suppress checks="ParameterNumber"
272+
files="(QuorumController).java"/>
271273
<suppress checks="CyclomaticComplexity"
272274
files="(ReplicationControlManager).java"/>
273275
<suppress checks="NPathComplexity"

core/src/test/java/kafka/test/MockController.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
201201
throw new UnsupportedOperationException();
202202
}
203203

204+
@Override
205+
public CompletableFuture<Long> beginWritingSnapshot() {
206+
throw new UnsupportedOperationException();
207+
}
208+
204209
@Override
205210
public void beginShutdown() {
206211
this.active = false;

metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.config.internals.QuotaConfigs;
2222
import org.apache.kafka.common.errors.InvalidRequestException;
2323
import org.apache.kafka.common.metadata.QuotaRecord;
24+
import org.apache.kafka.common.metadata.QuotaRecord.EntityData;
2425
import org.apache.kafka.common.protocol.Errors;
2526
import org.apache.kafka.common.quota.ClientQuotaAlteration;
2627
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -35,18 +36,20 @@
3536
import java.util.Collection;
3637
import java.util.Collections;
3738
import java.util.HashMap;
39+
import java.util.Iterator;
3840
import java.util.List;
3941
import java.util.Map;
42+
import java.util.Map.Entry;
43+
import java.util.NoSuchElementException;
4044
import java.util.Objects;
4145
import java.util.function.Supplier;
4246
import java.util.stream.Collectors;
4347

4448

4549
public class ClientQuotaControlManager {
46-
4750
private final SnapshotRegistry snapshotRegistry;
4851

49-
final TimelineHashMap<ClientQuotaEntity, Map<String, Double>> clientQuotaData;
52+
final TimelineHashMap<ClientQuotaEntity, TimelineHashMap<String, Double>> clientQuotaData;
5053

5154
ClientQuotaControlManager(SnapshotRegistry snapshotRegistry) {
5255
this.snapshotRegistry = snapshotRegistry;
@@ -98,7 +101,7 @@ public void replay(QuotaRecord record) {
98101
Map<String, String> entityMap = new HashMap<>(2);
99102
record.entity().forEach(entityData -> entityMap.put(entityData.entityType(), entityData.entityName()));
100103
ClientQuotaEntity entity = new ClientQuotaEntity(entityMap);
101-
Map<String, Double> quotas = clientQuotaData.get(entity);
104+
TimelineHashMap<String, Double> quotas = clientQuotaData.get(entity);
102105
if (quotas == null) {
103106
quotas = new TimelineHashMap<>(snapshotRegistry, 0);
104107
clientQuotaData.put(entity, quotas);
@@ -136,14 +139,15 @@ private void alterClientQuotaEntity(
136139
}
137140

138141
// Don't share objects between different records
139-
Supplier<List<QuotaRecord.EntityData>> recordEntitySupplier = () ->
140-
validatedEntityMap.entrySet().stream().map(mapEntry -> new QuotaRecord.EntityData()
142+
Supplier<List<EntityData>> recordEntitySupplier = () ->
143+
validatedEntityMap.entrySet().stream().map(mapEntry -> new EntityData()
141144
.setEntityType(mapEntry.getKey())
142145
.setEntityName(mapEntry.getValue()))
143146
.collect(Collectors.toList());
144147

145148
List<ApiMessageAndVersion> newRecords = new ArrayList<>(newQuotaConfigs.size());
146-
Map<String, Double> currentQuotas = clientQuotaData.getOrDefault(entity, Collections.emptyMap());
149+
Map<String, Double> currentQuotas = clientQuotaData.containsKey(entity) ?
150+
clientQuotaData.get(entity) : Collections.emptyMap();
147151
newQuotaConfigs.forEach((key, newValue) -> {
148152
if (newValue == null) {
149153
if (currentQuotas.containsKey(key)) {
@@ -249,7 +253,7 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va
249253
return new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity");
250254
}
251255

252-
for (Map.Entry<String, String> entityEntry : entity.entries().entrySet()) {
256+
for (Entry<String, String> entityEntry : entity.entries().entrySet()) {
253257
String entityType = entityEntry.getKey();
254258
String entityName = entityEntry.getValue();
255259
if (validatedEntityMap.containsKey(entityType)) {
@@ -272,4 +276,44 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va
272276

273277
return ApiError.NONE;
274278
}
279+
280+
class ClientQuotaControlIterator implements Iterator<List<ApiMessageAndVersion>> {
281+
private final long epoch;
282+
private final Iterator<Entry<ClientQuotaEntity, TimelineHashMap<String, Double>>> iterator;
283+
284+
ClientQuotaControlIterator(long epoch) {
285+
this.epoch = epoch;
286+
this.iterator = clientQuotaData.entrySet(epoch).iterator();
287+
}
288+
289+
@Override
290+
public boolean hasNext() {
291+
return iterator.hasNext();
292+
}
293+
294+
@Override
295+
public List<ApiMessageAndVersion> next() {
296+
if (!hasNext()) throw new NoSuchElementException();
297+
Entry<ClientQuotaEntity, TimelineHashMap<String, Double>> entry = iterator.next();
298+
ClientQuotaEntity entity = entry.getKey();
299+
List<ApiMessageAndVersion> records = new ArrayList<>();
300+
for (Entry<String, Double> quotaEntry : entry.getValue().entrySet(epoch)) {
301+
QuotaRecord record = new QuotaRecord();
302+
for (Entry<String, String> entityEntry : entity.entries().entrySet()) {
303+
record.entity().add(new EntityData().
304+
setEntityType(entityEntry.getKey()).
305+
setEntityName(entityEntry.getValue()));
306+
}
307+
record.setKey(quotaEntry.getKey());
308+
record.setValue(quotaEntry.getValue());
309+
record.setRemove(false);
310+
records.add(new ApiMessageAndVersion(record, (short) 0));
311+
}
312+
return records;
313+
}
314+
}
315+
316+
ClientQuotaControlIterator iterator(long epoch) {
317+
return new ClientQuotaControlIterator(epoch);
318+
}
275319
}

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
2525
import org.apache.kafka.common.metadata.FenceBrokerRecord;
2626
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
27+
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
28+
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
29+
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
30+
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
2731
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
2832
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
2933
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -40,8 +44,11 @@
4044

4145
import java.util.ArrayList;
4246
import java.util.HashMap;
47+
import java.util.Iterator;
4348
import java.util.List;
4449
import java.util.Map;
50+
import java.util.Map.Entry;
51+
import java.util.NoSuchElementException;
4552
import java.util.Optional;
4653
import java.util.concurrent.CompletableFuture;
4754

@@ -184,7 +191,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
184191
setBrokerEpoch(brokerEpoch).
185192
setRack(request.rack());
186193
for (BrokerRegistrationRequestData.Listener listener : request.listeners()) {
187-
record.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
194+
record.endPoints().add(new BrokerEndpoint().
188195
setHost(listener.host()).
189196
setName(listener.name()).
190197
setPort(listener.port()).
@@ -199,7 +206,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
199206
"the broker has an unsupported version of " + feature.name());
200207
}
201208
}
202-
record.features().add(new RegisterBrokerRecord.BrokerFeature().
209+
record.features().add(new BrokerFeature().
203210
setName(feature.name()).
204211
setMinSupportedVersion(feature.minSupportedVersion()).
205212
setMaxSupportedVersion(feature.maxSupportedVersion()));
@@ -219,13 +226,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
219226
public void replay(RegisterBrokerRecord record) {
220227
int brokerId = record.brokerId();
221228
List<Endpoint> listeners = new ArrayList<>();
222-
for (RegisterBrokerRecord.BrokerEndpoint endpoint : record.endPoints()) {
229+
for (BrokerEndpoint endpoint : record.endPoints()) {
223230
listeners.add(new Endpoint(endpoint.name(),
224231
SecurityProtocol.forId(endpoint.securityProtocol()),
225232
endpoint.host(), endpoint.port()));
226233
}
227234
Map<String, VersionRange> features = new HashMap<>();
228-
for (RegisterBrokerRecord.BrokerFeature feature : record.features()) {
235+
for (BrokerFeature feature : record.features()) {
229236
features.put(feature.name(), new VersionRange(
230237
feature.minSupportedVersion(), feature.maxSupportedVersion()));
231238
}
@@ -343,4 +350,56 @@ public void addReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers
343350
readyBrokersFuture = Optional.empty();
344351
}
345352
}
353+
354+
class ClusterControlIterator implements Iterator<List<ApiMessageAndVersion>> {
355+
private final Iterator<Entry<Integer, BrokerRegistration>> iterator;
356+
357+
ClusterControlIterator(long epoch) {
358+
this.iterator = brokerRegistrations.entrySet(epoch).iterator();
359+
}
360+
361+
@Override
362+
public boolean hasNext() {
363+
return iterator.hasNext();
364+
}
365+
366+
@Override
367+
public List<ApiMessageAndVersion> next() {
368+
if (!hasNext()) throw new NoSuchElementException();
369+
Entry<Integer, BrokerRegistration> entry = iterator.next();
370+
int brokerId = entry.getKey();
371+
BrokerRegistration registration = entry.getValue();
372+
BrokerEndpointCollection endpoints = new BrokerEndpointCollection();
373+
for (Entry<String, Endpoint> endpointEntry : registration.listeners().entrySet()) {
374+
endpoints.add(new BrokerEndpoint().setName(endpointEntry.getKey()).
375+
setHost(endpointEntry.getValue().host()).
376+
setPort(endpointEntry.getValue().port()).
377+
setSecurityProtocol(endpointEntry.getValue().securityProtocol().id));
378+
}
379+
BrokerFeatureCollection features = new BrokerFeatureCollection();
380+
for (Entry<String, VersionRange> featureEntry : registration.supportedFeatures().entrySet()) {
381+
features.add(new BrokerFeature().setName(featureEntry.getKey()).
382+
setMaxSupportedVersion(featureEntry.getValue().max()).
383+
setMinSupportedVersion(featureEntry.getValue().min()));
384+
}
385+
List<ApiMessageAndVersion> batch = new ArrayList<>();
386+
batch.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
387+
setBrokerId(brokerId).
388+
setIncarnationId(registration.incarnationId()).
389+
setBrokerEpoch(registration.epoch()).
390+
setEndPoints(endpoints).
391+
setFeatures(features).
392+
setRack(registration.rack().orElse(null)), (short) 0));
393+
if (!registration.fenced()) {
394+
batch.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
395+
setId(brokerId).
396+
setEpoch(registration.epoch()), (short) 0));
397+
}
398+
return batch;
399+
}
400+
}
401+
402+
ClusterControlIterator iterator(long epoch) {
403+
return new ClusterControlIterator(epoch);
404+
}
346405
}

metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Map.Entry;
43+
import java.util.NoSuchElementException;
4344
import java.util.Objects;
4445

4546
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
4647

48+
4749
public class ConfigurationControlManager {
4850
private final Logger log;
4951
private final SnapshotRegistry snapshotRegistry;
@@ -316,6 +318,9 @@ public void replay(ConfigRecord record) {
316318
} else {
317319
configs.put(record.name(), record.value());
318320
}
321+
if (configs.isEmpty()) {
322+
configData.remove(configResource);
323+
}
319324
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
320325
}
321326

@@ -368,4 +373,39 @@ public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
368373
void deleteTopicConfigs(String name) {
369374
configData.remove(new ConfigResource(Type.TOPIC, name));
370375
}
376+
377+
class ConfigurationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
378+
private final long epoch;
379+
private final Iterator<Entry<ConfigResource, TimelineHashMap<String, String>>> iterator;
380+
381+
ConfigurationControlIterator(long epoch) {
382+
this.epoch = epoch;
383+
this.iterator = configData.entrySet(epoch).iterator();
384+
}
385+
386+
@Override
387+
public boolean hasNext() {
388+
return iterator.hasNext();
389+
}
390+
391+
@Override
392+
public List<ApiMessageAndVersion> next() {
393+
if (!hasNext()) throw new NoSuchElementException();
394+
List<ApiMessageAndVersion> records = new ArrayList<>();
395+
Entry<ConfigResource, TimelineHashMap<String, String>> entry = iterator.next();
396+
ConfigResource resource = entry.getKey();
397+
for (Entry<String, String> configEntry : entry.getValue().entrySet(epoch)) {
398+
records.add(new ApiMessageAndVersion(new ConfigRecord().
399+
setResourceName(resource.name()).
400+
setResourceType(resource.type().id()).
401+
setName(configEntry.getKey()).
402+
setValue(configEntry.getValue()), (short) 0));
403+
}
404+
return records;
405+
}
406+
}
407+
408+
ConfigurationControlIterator iterator(long epoch) {
409+
return new ConfigurationControlIterator(epoch);
410+
}
371411
}

metadata/src/main/java/org/apache/kafka/controller/Controller.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
187187
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
188188
);
189189

190+
/**
191+
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
192+
* simply returns information about that snapshot rather than starting a new one.
193+
*
194+
* @return A future yielding the epoch of the snapshot.
195+
*/
196+
CompletableFuture<Long> beginWritingSnapshot();
197+
190198
/**
191199
* Begin shutting down, but don't block. You must still call close to clean up all
192200
* resources.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.controller;
19+
20+
import java.util.List;
21+
import java.util.NoSuchElementException;
22+
23+
import org.apache.kafka.common.protocol.ApiMessage;
24+
25+
26+
public class EmptySnapshotReader implements SnapshotReader {
27+
private final long epoch;
28+
29+
public EmptySnapshotReader(long epoch) {
30+
this.epoch = epoch;
31+
}
32+
33+
@Override
34+
public long epoch() {
35+
return epoch;
36+
}
37+
38+
@Override
39+
public void close() {
40+
// Nothing to do
41+
}
42+
43+
@Override
44+
public boolean hasNext() {
45+
return false;
46+
}
47+
48+
@Override
49+
public List<ApiMessage> next() {
50+
throw new NoSuchElementException();
51+
}
52+
}

0 commit comments

Comments
 (0)