Skip to content

Commit 86062e9

Browse files
efegbecketqin
authored andcommitted
KAFKA-6157; Fix repeated words words in JavaDoc and comments.
Author: Adem Efe Gencer <[email protected]> Reviewers: Jiangjie Qin <[email protected]> Closes #4170 from efeg/bug/typoFix
1 parent 520b313 commit 86062e9

File tree

40 files changed

+55
-54
lines changed

40 files changed

+55
-54
lines changed

clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private Deque<NetworkClient.InFlightRequest> requestQueue(String node) {
6060
}
6161

6262
/**
63-
* Get the oldest request (the one that that will be completed next) for the given node
63+
* Get the oldest request (the one that will be completed next) for the given node
6464
*/
6565
public NetworkClient.InFlightRequest completeNext(String node) {
6666
return requestQueue(node).pollLast();
@@ -167,5 +167,5 @@ public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs)
167167
}
168168
return nodeIds;
169169
}
170-
170+
171171
}

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
369369
if (!isInternalRequest) {
370370
// If this request came from outside the NetworkClient, validate
371371
// that we can send data. If the request is internal, we trust
372-
// that that internal code has done this validation. Validation
372+
// that internal code has done this validation. Validation
373373
// will be slightly different for some internal requests (for
374374
// example, ApiVersionsRequests can be sent prior to being in
375375
// READY state.)

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@
423423
* <p>
424424
* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
425425
* In order for this to work, consumers reading from these partitions should be configured to only read committed data.
426-
* This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration.
426+
* This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration.
427427
*
428428
* <p>
429429
* In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
@@ -704,9 +704,9 @@ private KafkaConsumer(ConsumerConfig config,
704704
IsolationLevel isolationLevel = IsolationLevel.valueOf(
705705
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
706706
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
707-
708-
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
709-
707+
708+
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
709+
710710
NetworkClient netClient = new NetworkClient(
711711
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
712712
this.metadata,

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,7 @@ private void ensureValidRecordSize(int size) {
951951
* </p>
952952
* <p>
953953
* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
954-
* flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)}
954+
* flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
955955
* calls made since the previous {@link #beginTransaction()} are completed before the commit.
956956
* </p>
957957
*

clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ private int writeLegacyCompressedWrapperHeader() {
384384
}
385385

386386
/**
387-
* Append a record and return its checksum for message format v0 and v1, or null for for v2 and above.
387+
* Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
388388
*/
389389
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
390390
ByteBuffer value, Header[] headers) {

clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public void authenticate() throws IOException {
186186
if (authenticateVersion != null)
187187
saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
188188
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
189-
// Fall through to send send handshake request with the latest supported version
189+
// Fall through to send handshake request with the latest supported version
190190
}
191191
case SEND_HANDSHAKE_REQUEST:
192192
SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,7 @@ public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
12131213
public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
12141214
log.info("Rebalance started");
12151215

1216-
// Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
1216+
// Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
12171217
// it is still important to have a leader that can write configs, offsets, etc.
12181218

12191219
if (rebalanceResolved) {

connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public void run() {
316316

317317
synchronized (KafkaBasedLog.this) {
318318
// Only invoke exactly the number of callbacks we found before triggering the read to log end
319-
// since it is possible for another write + readToEnd to sneak in in the meantime
319+
// since it is possible for another write + readToEnd to sneak in the meantime
320320
for (int i = 0; i < numCallbacks; i++) {
321321
Callback<Void> cb = readLogEndOffsetCallbacks.poll();
322322
cb.onCompletion(null, null);

connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ public void testPutTaskConfigs() throws Exception {
282282
assertNull(configState.taskConfig(TASK_IDS.get(0)));
283283
assertNull(configState.taskConfig(TASK_IDS.get(1)));
284284

285-
// Writing task task configs should block until all the writes have been performed and the root record update
285+
// Writing task configs should block until all the writes have been performed and the root record update
286286
// has completed
287287
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
288288
configStorage.putTaskConfigs("connector1", taskConfigs);
@@ -335,7 +335,7 @@ public void testPutTaskConfigsZeroTasks() throws Exception {
335335
ClusterConfigState configState = configStorage.snapshot();
336336
assertEquals(-1, configState.offset());
337337

338-
// Writing task task configs should block until all the writes have been performed and the root record update
338+
// Writing task configs should block until all the writes have been performed and the root record update
339339
// has completed
340340
List<Map<String, String>> taskConfigs = Collections.emptyList();
341341
configStorage.putTaskConfigs("connector1", taskConfigs);

core/src/main/scala/kafka/cluster/Partition.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ class Partition(val topic: String,
325325
}
326326

327327
/**
328-
* Update the the follower's state in the leader based on the last fetch request. See
328+
* Update the follower's state in the leader based on the last fetch request. See
329329
* [[kafka.cluster.Replica#updateLogReadResult]] for details.
330330
*
331331
* @return true if the leader's log start offset or high watermark have been updated

core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
380380
pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
381381
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
382382
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
383-
s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
383+
s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
384384

385385
val currentOffsetOpt = offsets.get(topicPartition)
386386
if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
@@ -405,6 +405,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
405405

406406
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
407407
topicPartitions.flatMap { topicPartition =>
408+
408409
pendingOffsetCommits.remove(topicPartition)
409410
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
410411
pendingOffsets.remove(topicPartition)

core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ class TransactionStateManager(brokerId: Int,
601601

602602
val append: Boolean = metadata.inLock {
603603
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
604-
// the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
604+
// the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
605605
responseCallback(Errors.NOT_COORDINATOR)
606606
false
607607
} else {

core/src/main/scala/kafka/server/DelayedDeleteRecords.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class DelayedDeleteRecords(delayMs: Long,
6262
/**
6363
* The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
6464
*
65-
* 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
65+
* 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response
6666
* 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
6767
*
6868
*/

core/src/main/scala/kafka/tools/JmxTool.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object JmxTool extends Logging {
6767
.describedAs("format")
6868
.ofType(classOf[String])
6969
val jmxServiceUrlOpt =
70-
parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
70+
parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
7171
.withRequiredArg
7272
.describedAs("service-url")
7373
.ofType(classOf[String])

core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
6060
private val topic1 = "topic-1"
6161

6262
/**
63-
* With replication, producer should able able to find new leader after it detects broker failure
63+
* With replication, producer should able to find new leader after it detects broker failure
6464
*/
6565
@Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
6666
@Test

core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
141141
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)
142142

143143
TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
144-
"Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
144+
"Consumer group info on deleted topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
145145
TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
146146
"Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
147147
}

core/src/test/scala/unit/kafka/log/LogManagerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class LogManagerTest {
211211
log.appendAsLeader(set, leaderEpoch = 0)
212212
}
213213
time.sleep(logManager.InitialTaskDelayMs)
214-
assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
214+
assertTrue("Time based flush should have been triggered", lastFlush != log.lastFlushTime)
215215
}
216216

217217
/**

core/src/test/scala/unit/kafka/network/SocketServerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
328328
TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed")
329329
TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives")
330330

331-
// Create new connection with same id when when `channel1` is in Selector.closingChannels
331+
// Create new connection with same id when `channel1` is in Selector.closingChannels
332332
// Check that new connection is closed and openOrClosingChannel still contains `channel1`
333333
connectAndWaitForConnectionRegister()
334334
TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")

core/src/test/scala/unit/kafka/utils/TestUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1431,7 +1431,7 @@ object TestUtils extends Logging {
14311431

14321432
private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)
14331433

1434-
// Verifies that the record was intended to be committed by checking the the headers for an expected transaction status
1434+
// Verifies that the record was intended to be committed by checking the headers for an expected transaction status
14351435
// If true, this will return the value as a string. It is expected that the record in question should have been created
14361436
// by the `producerRecordWithExpectedTransactionStatus` method.
14371437
def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {

docs/connect.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ <h5><a id="connect_taskexample" href="#connect_taskexample">Task Example - Sourc
329329
}
330330
</pre>
331331

332-
<p>These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
332+
<p>These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
333333

334334
<p>Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:</p>
335335

docs/documentation/streams/architecture.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/architecture.html" -->

docs/documentation/streams/core-concepts.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/core-concepts.html" -->

docs/documentation/streams/developer-guide.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/developer-guide.html" -->

docs/documentation/streams/index.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/index.html" -->

docs/documentation/streams/quickstart.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/quickstart.html" -->

docs/documentation/streams/tutorial.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/tutorial.html" -->

docs/documentation/streams/upgrade-guide.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@
1515
limitations under the License.
1616
-->
1717

18-
<!-- should always link the the latest release's documentation -->
18+
<!-- should always link the latest release's documentation -->
1919
<!--#include virtual="../../streams/upgrade-guide.html" -->

docs/implementation.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ <h5><a id="recordheader" href="#recordheader">5.4.2.1 Record Header</a></h5>
9999
headerValueLength: varint
100100
Value: byte[]
101101
</pre></p>
102-
<p>We use the the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
102+
<p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
103103
is also encoded as a varint.</p>
104104

105105
<h3><a id="log" href="#log">5.4 Log</a></h3>

streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class StreamsBuilder {
5959
final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
6060

6161
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
62-
62+
6363
/**
6464
* Create a {@link KStream} from the specified topics.
6565
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -237,7 +237,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
237237
* If this is not the case the returned {@link KTable} will be corrupted.
238238
* <p>
239239
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
240-
* store name. Note that that store name may not be queriable through Interactive Queries.
240+
* store name. Note that store name may not be queriable through Interactive Queries.
241241
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
242242
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
243243
*
@@ -258,7 +258,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic) {
258258
* If this is not the case the returned {@link KTable} will be corrupted.
259259
* <p>
260260
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
261-
* store name. Note that that store name may not be queriable through Interactive Queries.
261+
* store name. Note that store name may not be queriable through Interactive Queries.
262262
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
263263
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
264264
*
@@ -312,7 +312,7 @@ public synchronized <K, V> KTable<K, V> table(final String topic,
312312
* Input {@link KeyValue records} with {@code null} key will be dropped.
313313
* <p>
314314
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
315-
* store name. Note that that store name may not be queriable through Interactive Queries.
315+
* store name. Note that store name may not be queriable through Interactive Queries.
316316
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
317317
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
318318
* <p>
@@ -343,7 +343,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
343343
* Input {@link KeyValue records} with {@code null} key will be dropped.
344344
* <p>
345345
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
346-
* store name. Note that that store name may not be queriable through Interactive Queries.
346+
* store name. Note that store name may not be queriable through Interactive Queries.
347347
* No internal changelog topic is created since the original input topic can be used for recovery (cf.
348348
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
349349
* <p>

0 commit comments

Comments
 (0)