Skip to content

Commit e4f2f6f

Browse files
author
Boyang Chen
authored
KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017)
Remove null pointer from the public partitionsFor API. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 77573d8 commit e4f2f6f

File tree

5 files changed

+34
-11
lines changed

5 files changed

+34
-11
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,7 +1901,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
19011901
*
19021902
* @param topic The topic to get partition metadata for
19031903
*
1904-
* @return The list of partitions
1904+
* @return The list of partitions, which will be empty when the given topic is not found
19051905
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
19061906
* function is called
19071907
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1924,7 +1924,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
19241924
* @param topic The topic to get partition metadata for
19251925
* @param timeout The maximum of time to await topic metadata
19261926
*
1927-
* @return The list of partitions
1927+
* @return The list of partitions, which will be empty when the given topic is not found
19281928
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
19291929
* function is called
19301930
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1948,7 +1948,7 @@ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
19481948
Timer timer = time.timer(timeout);
19491949
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
19501950
new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
1951-
return topicMetadata.get(topic);
1951+
return topicMetadata.getOrDefault(topic, Collections.emptyList());
19521952
} finally {
19531953
release();
19541954
}

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOff
376376
@Override
377377
public synchronized List<PartitionInfo> partitionsFor(String topic) {
378378
ensureNotClosed();
379-
return this.partitions.get(topic);
379+
return this.partitions.getOrDefault(topic, Collections.emptyList());
380380
}
381381

382382
@Override

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,28 @@ private void consumerCloseTest(final long closeTimeoutMs,
17571757
}
17581758
}
17591759

1760+
@Test
1761+
public void testPartitionsForNonExistingTopic() {
1762+
Time time = new MockTime();
1763+
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
1764+
ConsumerMetadata metadata = createMetadata(subscription);
1765+
MockClient client = new MockClient(time, metadata);
1766+
1767+
initMetadata(client, Collections.singletonMap(topic, 1));
1768+
Cluster cluster = metadata.fetch();
1769+
1770+
MetadataResponse updateResponse = RequestTestUtils.metadataResponse(cluster.nodes(),
1771+
cluster.clusterResource().clusterId(),
1772+
cluster.controller().id(),
1773+
Collections.emptyList());
1774+
client.prepareResponse(updateResponse);
1775+
1776+
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
1777+
1778+
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
1779+
assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
1780+
}
1781+
17601782
@Test
17611783
public void testPartitionsForAuthenticationFailure() {
17621784
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();

connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,12 @@ public void start() {
175175
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
176176
long started = time.nanoseconds();
177177
long sleepMs = 100;
178-
while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
178+
while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
179179
time.sleep(sleepMs);
180180
sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
181181
partitionInfos = consumer.partitionsFor(topic);
182182
}
183-
if (partitionInfos == null)
183+
if (partitionInfos.isEmpty())
184184
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
185185
" allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
186186
" this is your first use of the topic it may have taken too long to create.");

connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
2121
import org.apache.kafka.common.KafkaException;
22+
import org.apache.kafka.common.PartitionInfo;
2223
import org.apache.kafka.common.TopicPartition;
2324
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
2425
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -40,7 +41,6 @@
4041
import java.util.List;
4142
import java.util.Map;
4243
import java.util.Objects;
43-
import java.util.Optional;
4444
import java.util.Properties;
4545
import java.util.Set;
4646
import java.util.stream.Collectors;
@@ -260,10 +260,11 @@ public void assertNoTopicStatusInStatusTopic() {
260260
Consumer<byte[], byte[]> verifiableConsumer = connect.kafka().createConsumer(
261261
Collections.singletonMap("group.id", "verifiable-consumer-group-0"));
262262

263-
List<TopicPartition> partitions =
264-
Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
265-
.orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
266-
.stream()
263+
List<PartitionInfo> partitionInfos = verifiableConsumer.partitionsFor(statusTopic);
264+
if (partitionInfos.isEmpty()) {
265+
throw new AssertionError("Unable to retrieve partitions info for status topic");
266+
}
267+
List<TopicPartition> partitions = partitionInfos.stream()
267268
.map(info -> new TopicPartition(info.topic(), info.partition()))
268269
.collect(Collectors.toList());
269270
verifiableConsumer.assign(partitions);

0 commit comments

Comments
 (0)