Skip to content

Commit 2f36001

Browse files
authored
KAFKA-12579: Remove various deprecated clients classes/methods for 3.0 (apache#10438)
* Remove `ExtendedSerializer` and `ExtendedDeserializer`, deprecated since 2.1. The extra functionality was also made available in `Serializer` and `Deserializer`. * Remove `close(long, TimeUnit)` from the producer, consumer and admin client, deprecated since 2.0 for the consumer and 2.2 for the rest. The replacement is `close(Duration)`. * Remove `ConsumerConfig.addDeserializerToConfig` and `ProducerConfig.addSerializerToConfig`, deprecated since 2.7 with no replacement. These methods were not intended to be public API and are likely not used much (if at all). * Remove `NoOffsetForPartitionException.partition()`, deprecated since 0.11. `partitions()` should be used instead. * Remove `MessageFormatter.init(Properties)`, deprecated since 2.7. The `configure(Map)` method should be used instead. * Remove `kafka.common.MessageFormatter`, deprecated since 2.7. `org.apache.kafka.common.MessageFormatter` should be used instead. Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>
1 parent 267b736 commit 2f36001

File tree

19 files changed

+36
-434
lines changed

19 files changed

+36
-434
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Optional;
2525
import java.util.Properties;
2626
import java.util.Set;
27-
import java.util.concurrent.TimeUnit;
2827

2928
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3029
import org.apache.kafka.common.ElectionType;
@@ -147,28 +146,11 @@ static Admin create(Map<String, Object> conf) {
147146
/**
148147
* Close the Admin and release all associated resources.
149148
* <p>
150-
* See {@link #close(long, TimeUnit)}
149+
* See {@link #close(Duration)}
151150
*/
152151
@Override
153152
default void close() {
154-
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
155-
}
156-
157-
/**
158-
* Close the Admin and release all associated resources.
159-
* <p>
160-
* The close operation has a grace period during which current operations will be allowed to
161-
* complete, specified by the given duration and time unit.
162-
* New operations will not be accepted during the grace period. Once the grace period is over,
163-
* all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
164-
*
165-
* @param duration The duration to use for the wait time.
166-
* @param unit The time unit to use for the wait time.
167-
* @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
168-
*/
169-
@Deprecated
170-
default void close(long duration, TimeUnit unit) {
171-
close(Duration.ofMillis(unit.toMillis(duration)));
153+
close(Duration.ofMillis(Long.MAX_VALUE));
172154
}
173155

174156
/**

clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Map;
2929
import java.util.OptionalLong;
3030
import java.util.Set;
31-
import java.util.concurrent.TimeUnit;
3231
import java.util.regex.Pattern;
3332

3433
/**
@@ -264,12 +263,6 @@ public interface Consumer<K, V> extends Closeable {
264263
*/
265264
void close();
266265

267-
/**
268-
* @see KafkaConsumer#close(long, TimeUnit)
269-
*/
270-
@Deprecated
271-
void close(long timeout, TimeUnit unit);
272-
273266
/**
274267
* @see KafkaConsumer#close(Duration)
275268
*/

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -578,19 +578,9 @@ private void maybeOverrideClientId(Map<String, Object> configs) {
578578
}
579579
}
580580

581-
/**
582-
* @deprecated Since 2.7.0. This will be removed in a future major release.
583-
*/
584-
@Deprecated
585-
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
586-
Deserializer<?> keyDeserializer,
587-
Deserializer<?> valueDeserializer) {
588-
return appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer);
589-
}
590-
591-
static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
592-
Deserializer<?> keyDeserializer,
593-
Deserializer<?> valueDeserializer) {
581+
protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
582+
Deserializer<?> keyDeserializer,
583+
Deserializer<?> valueDeserializer) {
594584
Map<String, Object> newConfigs = new HashMap<>(configs);
595585
if (keyDeserializer != null)
596586
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
@@ -599,22 +589,6 @@ static Map<String, Object> appendDeserializerToConfig(Map<String, Object> config
599589
return newConfigs;
600590
}
601591

602-
/**
603-
* @deprecated Since 2.7.0. This will be removed in a future major release.
604-
*/
605-
@Deprecated
606-
public static Properties addDeserializerToConfig(Properties properties,
607-
Deserializer<?> keyDeserializer,
608-
Deserializer<?> valueDeserializer) {
609-
Properties newProperties = new Properties();
610-
newProperties.putAll(properties);
611-
if (keyDeserializer != null)
612-
newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
613-
if (valueDeserializer != null)
614-
newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
615-
return newProperties;
616-
}
617-
618592
boolean maybeOverrideEnableAutoCommit() {
619593
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
620594
boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2308,29 +2308,6 @@ public void close() {
23082308
close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
23092309
}
23102310

2311-
/**
2312-
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
2313-
* {@code timeout} for the consumer to complete pending commits and leave the group.
2314-
* If auto-commit is enabled, this will commit the current offsets if possible within the
2315-
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
2316-
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
2317-
* used to interrupt close.
2318-
*
2319-
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
2320-
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
2321-
* @param timeUnit The time unit for the {@code timeout}
2322-
* @throws IllegalArgumentException If the {@code timeout} is negative.
2323-
* @throws InterruptException If the thread is interrupted before or while this function is called
2324-
* @throws org.apache.kafka.common.KafkaException for any other error during close
2325-
*
2326-
* @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
2327-
*/
2328-
@Deprecated
2329-
@Override
2330-
public void close(long timeout, TimeUnit timeUnit) {
2331-
close(Duration.ofMillis(timeUnit.toMillis(timeout)));
2332-
}
2333-
23342311
/**
23352312
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
23362313
* {@code timeout} for the consumer to complete pending commits and leave the group.

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.OptionalLong;
4141
import java.util.Queue;
4242
import java.util.Set;
43-
import java.util.concurrent.TimeUnit;
4443
import java.util.concurrent.atomic.AtomicBoolean;
4544
import java.util.regex.Pattern;
4645
import java.util.stream.Collectors;
@@ -447,13 +446,7 @@ public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartiti
447446
}
448447

449448
@Override
450-
public synchronized void close() {
451-
close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
452-
}
453-
454-
@Deprecated
455-
@Override
456-
public synchronized void close(long timeout, TimeUnit unit) {
449+
public final synchronized void close() {
457450
this.closed = true;
458451
}
459452

clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,6 @@ public NoOffsetForPartitionException(Collection<TopicPartition> partitions) {
4343
this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
4444
}
4545

46-
/**
47-
* returns the first partition (out of {@link #partitions}) for which no offset is defined.
48-
* @deprecated please use {@link #partitions}
49-
* @return a partition with no offset
50-
*/
51-
@Deprecated
52-
public TopicPartition partition() {
53-
return partitions.isEmpty() ? null : partitions.iterator().next();
54-
}
55-
5646
/**
5747
* returns all partitions for which no offests are defined.
5848
* @return all partitions without offsets

clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.concurrent.Future;
32-
import java.util.concurrent.TimeUnit;
3332

3433
/**
3534
* The interface for the {@link KafkaProducer}
@@ -100,11 +99,6 @@ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
10099
*/
101100
void close();
102101

103-
@Deprecated
104-
default void close(long timeout, TimeUnit unit) {
105-
close(Duration.ofMillis(unit.toMillis(timeout)));
106-
}
107-
108102
/**
109103
* See {@link KafkaProducer#close(Duration)}
110104
*/

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -490,15 +490,6 @@ private static String parseAcks(String acksString) {
490490
}
491491
}
492492

493-
/**
494-
* @deprecated Since 2.7.0. This will be removed in a future major release.
495-
*/
496-
@Deprecated
497-
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
498-
Serializer<?> keySerializer, Serializer<?> valueSerializer) {
499-
return appendSerializerToConfig(configs, keySerializer, valueSerializer);
500-
}
501-
502493
static Map<String, Object> appendSerializerToConfig(Map<String, Object> configs,
503494
Serializer<?> keySerializer,
504495
Serializer<?> valueSerializer) {
@@ -510,22 +501,6 @@ static Map<String, Object> appendSerializerToConfig(Map<String, Object> configs,
510501
return newConfigs;
511502
}
512503

513-
/**
514-
* @deprecated Since 2.7.0. This will be removed in a future major release.
515-
*/
516-
@Deprecated
517-
public static Properties addSerializerToConfig(Properties properties,
518-
Serializer<?> keySerializer,
519-
Serializer<?> valueSerializer) {
520-
Properties newProperties = new Properties();
521-
newProperties.putAll(properties);
522-
if (keySerializer != null)
523-
newProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
524-
if (valueSerializer != null)
525-
newProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
526-
return newProperties;
527-
}
528-
529504
public ProducerConfig(Properties props) {
530505
super(CONFIG, props);
531506
}

clients/src/main/java/org/apache/kafka/common/MessageFormatter.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.Closeable;
2020
import java.io.PrintStream;
2121
import java.util.Map;
22-
import java.util.Properties;
2322

2423
import org.apache.kafka.clients.consumer.ConsumerRecord;
2524

@@ -34,33 +33,21 @@
3433
*/
3534
public interface MessageFormatter extends Configurable, Closeable {
3635

37-
/**
38-
* Initialises the MessageFormatter
39-
* @param props Properties to configure the formatter
40-
* @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
41-
*/
42-
@Deprecated
43-
default public void init(Properties props) {}
44-
4536
/**
4637
* Configures the MessageFormatter
4738
* @param configs Map to configure the formatter
4839
*/
49-
default public void configure(Map<String, ?> configs) {
50-
Properties properties = new Properties();
51-
properties.putAll(configs);
52-
init(properties);
53-
}
40+
default void configure(Map<String, ?> configs) {}
5441

5542
/**
5643
* Parses and formats a record for display
5744
* @param consumerRecord the record to format
5845
* @param output the print stream used to output the record
5946
*/
60-
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
47+
void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
6148

6249
/**
6350
* Closes the formatter
6451
*/
65-
default public void close() {}
52+
default void close() {}
6653
}

clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

0 commit comments

Comments
 (0)