From 86062e9a78dccad74e012f11755025512ad5cf63 Mon Sep 17 00:00:00 2001 From: Adem Efe Gencer Date: Sun, 5 Nov 2017 18:00:43 -0800 Subject: [PATCH] KAFKA-6157; Fix repeated words words in JavaDoc and comments. Author: Adem Efe Gencer Reviewers: Jiangjie Qin Closes #4170 from efeg/bug/typoFix --- .../org/apache/kafka/clients/InFlightRequests.java | 4 ++-- .../java/org/apache/kafka/clients/NetworkClient.java | 2 +- .../apache/kafka/clients/consumer/KafkaConsumer.java | 8 ++++---- .../apache/kafka/clients/producer/KafkaProducer.java | 2 +- .../kafka/common/record/MemoryRecordsBuilder.java | 2 +- .../authenticator/SaslClientAuthenticator.java | 2 +- .../runtime/distributed/DistributedHerder.java | 2 +- .../org/apache/kafka/connect/util/KafkaBasedLog.java | 2 +- .../connect/storage/KafkaConfigBackingStoreTest.java | 4 ++-- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- .../kafka/coordinator/group/GroupMetadata.scala | 3 ++- .../transaction/TransactionStateManager.scala | 2 +- .../scala/kafka/server/DelayedDeleteRecords.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- .../integration/kafka/api/ProducerBounceTest.scala | 2 +- .../unit/kafka/admin/DeleteConsumerGroupTest.scala | 2 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +- .../scala/unit/kafka/network/SocketServerTest.scala | 2 +- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 +- docs/connect.html | 2 +- docs/documentation/streams/architecture.html | 2 +- docs/documentation/streams/core-concepts.html | 2 +- docs/documentation/streams/developer-guide.html | 2 +- docs/documentation/streams/index.html | 2 +- docs/documentation/streams/quickstart.html | 2 +- docs/documentation/streams/tutorial.html | 2 +- docs/documentation/streams/upgrade-guide.html | 2 +- docs/implementation.html | 2 +- .../org/apache/kafka/streams/StreamsBuilder.java | 10 +++++----- .../apache/kafka/streams/kstream/KStreamBuilder.java | 12 ++++++------ .../streams/kstream/internals/KStreamAggregate.java | 2 +- .../streams/kstream/internals/KStreamReduce.java | 2 +- .../kstream/internals/KStreamWindowAggregate.java | 2 +- .../kstream/internals/KStreamWindowReduce.java | 2 +- .../streams/kstream/internals/KTableAggregate.java | 2 +- .../streams/kstream/internals/KTableReduce.java | 2 +- .../streams/processor/internals/StreamTask.java | 2 +- tests/setup.cfg | 2 +- tests/unit/setup.cfg | 2 +- .../org/apache/kafka/tools/ProducerPerformance.java | 2 +- 40 files changed, 55 insertions(+), 54 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index f9773297dbb..3689a09a117 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -60,7 +60,7 @@ final class InFlightRequests { } /** - * Get the oldest request (the one that that will be completed next) for the given node + * Get the oldest request (the one that will be completed next) for the given node */ public NetworkClient.InFlightRequest completeNext(String node) { return requestQueue(node).pollLast(); @@ -167,5 +167,5 @@ final class InFlightRequests { } return nodeIds; } - + } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ee7258ad2f9..0654a91c8b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -369,7 +369,7 @@ public class NetworkClient implements KafkaClient { if (!isInternalRequest) { // If this request came from outside the NetworkClient, validate // that we can send data. If the request is internal, we trust - // that that internal code has done this validation. Validation + // that internal code has done this validation. Validation // will be slightly different for some internal requests (for // example, ApiVersionsRequests can be sent prior to being in // READY state.) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 05bca2234c6..e9499cbd465 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -423,7 +423,7 @@ import java.util.regex.Pattern; *

* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. * In order for this to work, consumers reading from these partitions should be configured to only read committed data. - * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration. + * This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration. * *

* In read_committed mode, the consumer will read only those transactional messages which have been @@ -704,9 +704,9 @@ public class KafkaConsumer implements Consumer { IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics); - - int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); - + + int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); + NetworkClient netClient = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 8004180b647..b3cff19114e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -951,7 +951,7 @@ public class KafkaProducer implements Producer { *

*

* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will - * flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)} + * flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)} * calls made since the previous {@link #beginTransaction()} are completed before the commit. *

* diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index ad0bab74d39..a9b57ac22df 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -384,7 +384,7 @@ public class MemoryRecordsBuilder { } /** - * Append a record and return its checksum for message format v0 and v1, or null for for v2 and above. + * Append a record and return its checksum for message format v0 and v1, or null for v2 and above. */ private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index b01ae4cd32c..8b0116563d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -186,7 +186,7 @@ public class SaslClientAuthenticator implements Authenticator { if (authenticateVersion != null) saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion())); setSaslState(SaslState.SEND_HANDSHAKE_REQUEST); - // Fall through to send send handshake request with the latest supported version + // Fall through to send handshake request with the latest supported version } case SEND_HANDSHAKE_REQUEST: SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 4d3d07b1023..79d32da65c6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1213,7 +1213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { public void onRevoked(String leader, Collection connectors, Collection tasks) { log.info("Rebalance started"); - // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance, + // Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance, // it is still important to have a leader that can write configs, offsets, etc. if (rebalanceResolved) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 0e190bc3767..de1ceb3be10 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -316,7 +316,7 @@ public class KafkaBasedLog { synchronized (KafkaBasedLog.this) { // Only invoke exactly the number of callbacks we found before triggering the read to log end - // since it is possible for another write + readToEnd to sneak in in the meantime + // since it is possible for another write + readToEnd to sneak in the meantime for (int i = 0; i < numCallbacks; i++) { Callback cb = readLogEndOffsetCallbacks.poll(); cb.onCompletion(null, null); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index e9dd18e1377..aac1b78c918 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -282,7 +282,7 @@ public class KafkaConfigBackingStoreTest { assertNull(configState.taskConfig(TASK_IDS.get(0))); assertNull(configState.taskConfig(TASK_IDS.get(1))); - // Writing task task configs should block until all the writes have been performed and the root record update + // Writing task configs should block until all the writes have been performed and the root record update // has completed List> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)); configStorage.putTaskConfigs("connector1", taskConfigs); @@ -335,7 +335,7 @@ public class KafkaConfigBackingStoreTest { ClusterConfigState configState = configStorage.snapshot(); assertEquals(-1, configState.offset()); - // Writing task task configs should block until all the writes have been performed and the root record update + // Writing task configs should block until all the writes have been performed and the root record update // has completed List> taskConfigs = Collections.emptyList(); configStorage.putTaskConfigs("connector1", taskConfigs); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 634d2d543c9..e29467d7e6e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -325,7 +325,7 @@ class Partition(val topic: String, } /** - * Update the the follower's state in the leader based on the last fetch request. See + * Update the follower's state in the leader based on the last fetch request. See * [[kafka.cluster.Replica#updateLogReadResult]] for details. * * @return true if the leader's log start offset or high watermark have been updated diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 901946176b1..42ca6ea0a3f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -380,7 +380,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + - s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.") + s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.") val currentOffsetOpt = offsets.get(topicPartition) if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) { @@ -405,6 +405,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { topicPartitions.flatMap { topicPartition => + pendingOffsetCommits.remove(topicPartition) pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) => pendingOffsets.remove(topicPartition) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index b962e82920d..e79a6e3e588 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -601,7 +601,7 @@ class TransactionStateManager(brokerId: Int, val append: Boolean = metadata.inLock { if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { - // the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR + // the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR responseCallback(Errors.NOT_COORDINATOR) false } else { diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala index a6a820223fd..7a00bc1e2da 100644 --- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala +++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala @@ -62,7 +62,7 @@ class DelayedDeleteRecords(delayMs: Long, /** * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following: * - * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response + * 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response * */ diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index c1221414af7..4a6a348d9a6 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -67,7 +67,7 @@ object JmxTool extends Logging { .describedAs("format") .ofType(classOf[String]) val jmxServiceUrlOpt = - parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") + parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") .withRequiredArg .describedAs("service-url") .ofType(classOf[String]) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 1bde7b12110..ab13b0a557e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -60,7 +60,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { private val topic1 = "topic-1" /** - * With replication, producer should able able to find new leader after it detects broker failure + * With replication, producer should able to find new leader after it detects broker failure */ @Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837) @Test diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala index a8955f57153..ba11471e488 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -141,7 +141,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness { AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete) TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist), - "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + "Consumer group info on deleted topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist), "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6544d43f6f9..4c1f4ae1bf7 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -211,7 +211,7 @@ class LogManagerTest { log.appendAsLeader(set, leaderEpoch = 0) } time.sleep(logManager.InitialTaskDelayMs) - assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) + assertTrue("Time based flush should have been triggered", lastFlush != log.lastFlushTime) } /** diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index aebbf5ca686..0c05c469b3c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed") TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives") - // Create new connection with same id when when `channel1` is in Selector.closingChannels + // Create new connection with same id when `channel1` is in Selector.closingChannels // Check that new connection is closed and openOrClosingChannel still contains `channel1` connectAndWaitForConnectionRegister() TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a99cd50d91e..360b9dc3831 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1431,7 +1431,7 @@ object TestUtils extends Logging { private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8) - // Verifies that the record was intended to be committed by checking the the headers for an expected transaction status + // Verifies that the record was intended to be committed by checking the headers for an expected transaction status // If true, this will return the value as a string. It is expected that the record in question should have been created // by the `producerRecordWithExpectedTransactionStatus` method. def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = { diff --git a/docs/connect.html b/docs/connect.html index 886e3480624..78c66b1c23a 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -329,7 +329,7 @@ } -

These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the start() method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the stop() method is synchronized. This will be necessary because SourceTasks are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.

+

These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the start() method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the stop() method is synchronized. This will be necessary because SourceTasks are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.

Next, we implement the main functionality of the task, the poll() method which gets events from the input system and returns a List<SourceRecord>:

diff --git a/docs/documentation/streams/architecture.html b/docs/documentation/streams/architecture.html index 6ba69f3e852..ad7b323d077 100644 --- a/docs/documentation/streams/architecture.html +++ b/docs/documentation/streams/architecture.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/core-concepts.html b/docs/documentation/streams/core-concepts.html index ff46c537b62..d699b795000 100644 --- a/docs/documentation/streams/core-concepts.html +++ b/docs/documentation/streams/core-concepts.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/developer-guide.html b/docs/documentation/streams/developer-guide.html index e258331537e..88127373ba2 100644 --- a/docs/documentation/streams/developer-guide.html +++ b/docs/documentation/streams/developer-guide.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/index.html b/docs/documentation/streams/index.html index 1aaaff4b52f..5ff3b3b6fbe 100644 --- a/docs/documentation/streams/index.html +++ b/docs/documentation/streams/index.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/quickstart.html b/docs/documentation/streams/quickstart.html index f69c0d5ae75..efb0234e240 100644 --- a/docs/documentation/streams/quickstart.html +++ b/docs/documentation/streams/quickstart.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/tutorial.html b/docs/documentation/streams/tutorial.html index 90f408df3f1..e2cf4016e70 100644 --- a/docs/documentation/streams/tutorial.html +++ b/docs/documentation/streams/tutorial.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/documentation/streams/upgrade-guide.html b/docs/documentation/streams/upgrade-guide.html index 0c687956dcd..b1b32007e51 100644 --- a/docs/documentation/streams/upgrade-guide.html +++ b/docs/documentation/streams/upgrade-guide.html @@ -15,5 +15,5 @@ limitations under the License. --> - + diff --git a/docs/implementation.html b/docs/implementation.html index af234ea626f..8b97aa04649 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -99,7 +99,7 @@ headerValueLength: varint Value: byte[]

-

We use the the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record +

We use the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record is also encoded as a varint.

5.4 Log

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index b5cc6d79c6c..0aac45a20bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -59,7 +59,7 @@ public class StreamsBuilder { final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder; private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); - + /** * Create a {@link KStream} from the specified topics. * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value @@ -237,7 +237,7 @@ public class StreamsBuilder { * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * @@ -258,7 +258,7 @@ public class StreamsBuilder { * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * @@ -312,7 +312,7 @@ public class StreamsBuilder { * Input {@link KeyValue records} with {@code null} key will be dropped. *

* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

@@ -343,7 +343,7 @@ public class StreamsBuilder { * Input {@link KeyValue records} with {@code null} key will be dropped. *

* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 77745d3113c..d747ce8d049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -444,7 +444,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * @param topic the topic name; cannot be {@code null} @@ -537,7 +537,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

@@ -714,7 +714,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

@@ -908,7 +908,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * If this is not the case the returned {@link KTable} will be corrupted. *

* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

@@ -1007,7 +1007,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * Input {@link KeyValue records} with {@code null} key will be dropped. *

* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

@@ -1196,7 +1196,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB * Input {@link KeyValue records} with {@code null} key will be dropped. *

* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal - * store name. Note that that store name may not be queriable through Interactive Queries. + * store name. Note that store name may not be queriable through Interactive Queries. * No internal changelog topic is created since the original input topic can be used for recovery (cf. * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 67b65a38701..b1abdc29de0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -74,7 +74,7 @@ public class KStreamAggregate implements KStreamAggProcessorSupplier implements KStreamAggProcessorSupplier implements KStrea if (oldAgg == null) oldAgg = initializer.apply(); - // try to add the new new value (there will never be old value) + // try to add the new value (there will never be old value) T newAgg = aggregator.apply(key, value, oldAgg); // update the store with the new value diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index c20601af0de..7d02f118f89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -98,7 +98,7 @@ public class KStreamWindowReduce implements KStreamAggPr V oldAgg = entry.value; V newAgg = oldAgg; - // try to add the new new value (there will never be old value) + // try to add the new value (there will never be old value) if (newAgg == null) { newAgg = value; } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 973de0f7379..0fe3e1f42b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -84,7 +84,7 @@ public class KTableAggregate implements KTableProcessorSupplier implements KTableProcessorSupplier { V oldAgg = store.get(key); V newAgg = oldAgg; - // first try to add the new new value + // first try to add the new value if (value.newValue != null) { if (newAgg == null) { newAgg = value.newValue; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8180b2cf7b1..06f45ed24f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -469,7 +469,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator transactionInFlight = false; } catch (final ProducerFencedException ignore) { /* TODO - * this should actually never happen atm as we we guard the call to #abortTransaction + * this should actually never happen atm as we guard the call to #abortTransaction * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got * fixed and fall-back to this catch-and-swallow code diff --git a/tests/setup.cfg b/tests/setup.cfg index c70f1e498bd..974d5bb9a97 100644 --- a/tests/setup.cfg +++ b/tests/setup.cfg @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pytest configuration (can also be defined in in tox.ini or pytest.ini file) +# pytest configuration (can also be defined in tox.ini or pytest.ini file) # # This file defines naming convention and root search directory for autodiscovery of # pytest unit tests for the system test service classes. diff --git a/tests/unit/setup.cfg b/tests/unit/setup.cfg index e757a99fb3b..3470da12185 100644 --- a/tests/unit/setup.cfg +++ b/tests/unit/setup.cfg @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pytest configuration (can also be defined in in tox.ini or pytest.ini file) +# pytest configuration (can also be defined in tox.ini or pytest.ini file) # # To ease possible confusion, prefix muckrake *unit* tests with 'check' instead of 'test', since # many muckrake files, classes, and methods have 'test' somewhere in the name diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 0436d67080d..d7572b0f33d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -289,7 +289,7 @@ public class ProducerPerformance { .metavar("TRANSACTION-DURATION") .dest("transactionDurationMs") .setDefault(0L) - .help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive."); + .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive."); return parser;