Skip to content

Commit c6496e0

Browse files
authored
MINOR: Cleanup 0.10.x legacy references in ClusterResourceListener and TopicConfig (clients module) (apache#19388)
This PR is a minor follow-up to [PR apache#19320](apache#19320), which cleaned up 0.10.x legacy information from the clients module. It addresses remaining reviewer suggestions that were not included in the original PR: - `ClusterResourceListener`: Removed "Note the minimum supported broker version is 2.1." per review suggestion to avoid repeating version-specific details across multiple classes. - `TopicConfig`: Simplified `MAX_MESSAGE_BYTES_DOC` by removing obsolete notes about behavior in versions prior to 0.10.2. These changes help reduce outdated version information in client documentation and improve clarity. Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 8d66481 commit c6496e0

File tree

9 files changed

+11
-40
lines changed

9 files changed

+11
-40
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,7 @@ public class ConsumerConfig extends AbstractConfig {
371371
public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
372372
private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" +
373373
" subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" +
374-
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
375-
" be set to `true` when using brokers older than 0.11.0";
374+
" broker allows for it using `auto.create.topics.enable` broker configuration.";
376375
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
377376

378377
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,7 @@ protected void handleFetchSuccess(final Node fetchTarget,
218218
partition,
219219
partitionData,
220220
metricAggregator,
221-
fetchOffset,
222-
requestVersion);
221+
fetchOffset);
223222
fetchBuffer.add(completedFetch);
224223
}
225224

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public class CompletedFetch {
6060

6161
final TopicPartition partition;
6262
final FetchResponseData.PartitionData partitionData;
63-
final short requestVersion;
6463

6564
private final Logger log;
6665
private final SubscriptionState subscriptions;
@@ -88,8 +87,7 @@ public class CompletedFetch {
8887
TopicPartition partition,
8988
FetchResponseData.PartitionData partitionData,
9089
FetchMetricsAggregator metricAggregator,
91-
Long fetchOffset,
92-
short requestVersion) {
90+
Long fetchOffset) {
9391
this.log = log;
9492
this.subscriptions = subscriptions;
9593
this.decompressionBufferSupplier = decompressionBufferSupplier;
@@ -98,7 +96,6 @@ public class CompletedFetch {
9896
this.metricAggregator = metricAggregator;
9997
this.batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
10098
this.nextFetchOffset = fetchOffset;
101-
this.requestVersion = requestVersion;
10299
this.lastEpoch = Optional.empty();
103100
this.abortedProducerIds = new HashSet<>();
104101
this.abortedTransactions = abortedTransactions(partitionData);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
*/
1717
package org.apache.kafka.clients.consumer.internals;
1818

19-
import org.apache.kafka.clients.consumer.ConsumerConfig;
2019
import org.apache.kafka.clients.consumer.ConsumerRecord;
2120
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2221
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
2322
import org.apache.kafka.common.KafkaException;
2423
import org.apache.kafka.common.TopicPartition;
25-
import org.apache.kafka.common.errors.RecordTooLargeException;
2624
import org.apache.kafka.common.errors.TopicAuthorizationException;
2725
import org.apache.kafka.common.message.FetchResponseData;
2826
import org.apache.kafka.common.protocol.Errors;
@@ -37,7 +35,6 @@
3735
import java.util.Collections;
3836
import java.util.Iterator;
3937
import java.util.List;
40-
import java.util.Map;
4138
import java.util.Optional;
4239
import java.util.Queue;
4340

@@ -263,21 +260,10 @@ private CompletedFetch handleInitializeSuccess(final CompletedFetch completedFet
263260
Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
264261

265262
if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
266-
if (completedFetch.requestVersion < 3) {
267-
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
268-
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
269-
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
270-
recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
271-
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
272-
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
273-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
274-
recordTooLargePartitions);
275-
} else {
276-
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
277-
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
278-
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
279-
"complete records were found.");
280-
}
263+
// This should not happen with brokers that support FetchRequest/Response V4 or higher (i.e. KIP-74)
264+
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
265+
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
266+
"complete records were found.");
281267
}
282268

283269
if (!updatePartitionState(partition, tp)) {

clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
* <p>
2525
* <h4>Clients</h4>
2626
* There will be one invocation of {@link ClusterResourceListener#onUpdate(ClusterResource)} after each metadata response.
27-
* Note the minimum supported broker version is 2.1.
2827
* <p>
2928
* {@link org.apache.kafka.clients.producer.ProducerInterceptor} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after {@link org.apache.kafka.clients.producer.ProducerInterceptor#onSend(org.apache.kafka.clients.producer.ProducerRecord)}
3029
* but before {@link org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement(org.apache.kafka.clients.producer.RecordMetadata, Exception)} .

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,7 @@ public class TopicConfig {
108108

109109
public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
110110
public static final String MAX_MESSAGE_BYTES_DOC =
111-
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
112-
"In the latest message format version, records are always grouped into batches for efficiency. " +
113-
"In previous message format versions, uncompressed records are not grouped into batches and this " +
114-
"limit only applies to a single record in that case.";
111+
"The largest record batch size allowed by Kafka (after compression if compression is enabled).";
115112

116113
public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
117114
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.kafka.common.header.internals.RecordHeaders;
2727
import org.apache.kafka.common.message.FetchResponseData;
2828
import org.apache.kafka.common.metrics.Metrics;
29-
import org.apache.kafka.common.protocol.ApiKeys;
3029
import org.apache.kafka.common.record.ControlRecordType;
3130
import org.apache.kafka.common.record.EndTransactionMarker;
3231
import org.apache.kafka.common.record.MemoryRecords;
@@ -227,8 +226,7 @@ private CompletedFetch newCompletedFetch(long fetchOffset,
227226
TP,
228227
partitionData,
229228
metricAggregator,
230-
fetchOffset,
231-
ApiKeys.FETCH.latestVersion());
229+
fetchOffset);
232230
}
233231

234232
private static Deserializers<UUID, UUID> newUuidDeserializers() {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.kafka.common.TopicPartition;
2121
import org.apache.kafka.common.message.FetchResponseData;
2222
import org.apache.kafka.common.metrics.Metrics;
23-
import org.apache.kafka.common.protocol.ApiKeys;
2423
import org.apache.kafka.common.serialization.StringSerializer;
2524
import org.apache.kafka.common.utils.BufferSupplier;
2625
import org.apache.kafka.common.utils.LogContext;
@@ -198,8 +197,7 @@ private CompletedFetch completedFetch(TopicPartition tp) {
198197
tp,
199198
partitionData,
200199
metricsAggregator,
201-
0L,
202-
ApiKeys.FETCH.latestVersion());
200+
0L);
203201
}
204202

205203
/**

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.kafka.common.internals.ClusterResourceListeners;
2727
import org.apache.kafka.common.message.FetchResponseData;
2828
import org.apache.kafka.common.metrics.Metrics;
29-
import org.apache.kafka.common.protocol.ApiKeys;
3029
import org.apache.kafka.common.protocol.Errors;
3130
import org.apache.kafka.common.record.ControlRecordType;
3231
import org.apache.kafka.common.record.EndTransactionMarker;
@@ -921,8 +920,7 @@ private CompletedFetch build() {
921920
topicPartition,
922921
partitionData,
923922
metricsAggregator,
924-
fetchOffset,
925-
ApiKeys.FETCH.latestVersion());
923+
fetchOffset);
926924
}
927925
}
928926

0 commit comments

Comments
 (0)