diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7f5b16f244e..d9639815244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -467,7 +467,7 @@ public final class RecordAccumulator { abortBatches(); } while (appendsInProgress()); // After this point, no thread will append any messages because they will see the close - // flag set. We need to do the last abort after no thread was appending in case the there was a new + // flag set. We need to do the last abort after no thread was appending in case there was a new // batch appended by the last appending thread. abortBatches(); this.batches.clear(); diff --git a/clients/src/main/java/org/apache/kafka/common/network/LoginType.java b/clients/src/main/java/org/apache/kafka/common/network/LoginType.java index 9216cb0c5c0..a3a2b279e6b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/LoginType.java +++ b/clients/src/main/java/org/apache/kafka/common/network/LoginType.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.security.JaasUtils; /** * The type of the login context, it should be SERVER for the broker and CLIENT for the clients (i.e. consumer and - * producer). It provides the the login context name which defines the section of the JAAS configuration file to be used + * producer). It provides the login context name which defines the section of the JAAS configuration file to be used * for login. */ public enum LoginType { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 475a4f2c362..248b7ecc51c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -478,7 +478,7 @@ public class Protocol { "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"), new Field("protocol_type", STRING, - "The current group protocol type (will be empty if the there is no active group)"), + "The current group protocol type (will be empty if there is no active group)"), new Field("protocol", STRING, "The current group protocol (only provided if the group is Stable)"), diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 37c056a6928..591644c1cbd 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -299,7 +299,7 @@ public class ConnectSchema implements Schema { /** - * Get the {@link Schema.Type} associated with the the given class. + * Get the {@link Schema.Type} associated with the given class. * * @param klass the Class to * @return the corresponding type, nor null if there is no matching type diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index dd4ea88a5ae..232db4ac1d5 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -379,7 +379,7 @@ object TopicCommand extends Logging { def shortMessageSizeWarning(maxMessageBytes: Int): String = { "\n\n" + "*****************************************************************************************************\n" + - "*** WARNING: you are creating a topic where the the max.message.bytes is greater than the consumer ***\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the consumer ***\n" + "*** default. This operation is potentially dangerous. Consumers will get failures if their ***\n" + "*** fetch.message.max.bytes < the value you are using. ***\n" + "*****************************************************************************************************\n" + diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index bd8ec7ec877..155b3fdd942 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -668,7 +668,7 @@ class ZkUtils(val zkClient: ZkClient, } } - // Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed + // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { Json.parseFull(jsonData) match { case Some(m) => diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 3f6a2753a55..8c973a45f41 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -228,7 +228,7 @@ class LogTest extends JUnitSuite { /** * Test reading at the boundary of the log, specifically * - reading from the logEndOffset should give an empty message set - * - reading from the the maxOffset should give an empty message set + * - reading from the maxOffset should give an empty message set * - reading beyond the log end offset should throw an OffsetOutOfRangeException */ @Test diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 193acfd2bb7..69e83c03be8 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -104,7 +104,7 @@ class ClientQuotaManagerTest { assertEquals(10, numCallbacks) time.sleep(sleepTime) - // Callback can only be triggered after the the delay time passes + // Callback can only be triggered after the delay time passes clientMetrics.throttledRequestReaper.doWork() assertEquals(0, queueSizeMetric.value().toInt) assertEquals(11, numCallbacks) diff --git a/docs/streams.html b/docs/streams.html index 9b94bb32c06..91fda368ae6 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -64,7 +64,7 @@ developers define and connect custom processors as well as to interact with Time

-A critical aspect in stream processing is the the notion of time, and how it is modeled and integrated. +A critical aspect in stream processing is the notion of time, and how it is modeled and integrated. For example, some operations such as windowing are defined based on time boundaries.