- throw new RuntimeException(s"$regex is an invalid regex.")
- }
-
- val fetchSize = options.valueOf(fetchSizeOpt).intValue
- val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
- val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
- val reportInterval = options.valueOf(reportIntervalOpt).longValue
- // getting topic metadata
- info("Getting topic metadata...")
- val brokerList = options.valueOf(brokerListOpt)
- ToolsUtils.validatePortOrDie(parser, brokerList)
-
- val (topicsMetadata, brokerInfo) = {
- val adminClient = createAdminClient(brokerList)
- try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
- finally CoreUtils.swallow(adminClient.close(), this)
- }
-
- val topicIds = topicsMetadata.map( metadata => metadata.name() -> metadata.topicId()).toMap
-
- val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
- topicsIncludeFilter.isTopicAllowed(topicMetaData.name, false)
- }
-
- if (filteredTopicMetadata.isEmpty) {
- error(s"No topics found. $topicsIncludeOpt if specified, is either filtering out all topics or there is no topic.")
- Exit.exit(1)
- }
-
- val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
- topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
- partitionMetadata.replicas.asScala.map { node =>
- TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
- }
- }
- }
- debug(s"Selected topic partitions: $topicPartitionReplicas")
- val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
- brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
- }
- debug(s"Topic partitions per broker: $brokerToTopicPartitions")
- val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
- new TopicPartition(replica.topic, replica.partitionId)
- }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
- debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")
-
- val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
- topicMetaData.partitions.asScala.map { partitionMetadata =>
- new TopicPartition(topicMetaData.name, partitionMetadata.partition)
- }
- }
-
- val consumerProps = consumerConfig(brokerList)
-
- val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
- initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
- brokerToTopicPartitions.size,
- reportInterval)
- // create all replica fetcher threads
- val verificationBrokerId = brokerToTopicPartitions.head._1
- val counter = new AtomicInteger(0)
- val fetcherThreads = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
- new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
- sourceBroker = brokerInfo(brokerId),
- topicPartitions = topicPartitions,
- topicIds = topicIds,
- replicaBuffer = replicaBuffer,
- socketTimeout = 30000,
- socketBufferSize = 256000,
- fetchSize = fetchSize,
- maxWait = maxWaitMs,
- minBytes = 1,
- doVerification = brokerId == verificationBrokerId,
- consumerProps,
- fetcherId = counter.incrementAndGet())
- }
-
- Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
- info("Stopping all fetchers")
- fetcherThreads.foreach(_.shutdown())
- })
- fetcherThreads.foreach(_.start())
- println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")
-
- }
-
- private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = {
- val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get
- adminClient.describeTopics(topics).allTopicNames.get.values.asScala.toBuffer
- }
-
- private def brokerDetails(adminClient: Admin): Map[Int, Node] = {
- adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap
- }
-
- private def createAdminClient(brokerUrl: String): Admin = {
- val props = new Properties()
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
- Admin.create(props)
- }
-
- private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
- initialOffsetTime: Long): collection.Map[TopicPartition, Long] = {
- val consumer = createConsumer(consumerConfig)
- try {
- if (ListOffsetsRequest.LATEST_TIMESTAMP == initialOffsetTime)
- consumer.endOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
- else if (ListOffsetsRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
- consumer.beginningOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
- else {
- val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
- consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.map { case (k, v) => k -> v.offset }
- }
- } finally consumer.close()
- }
-
- private def consumerConfig(brokerUrl: String): Properties = {
- val properties = new Properties()
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification")
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
- properties
- }
-
- private def createConsumer(consumerConfig: Properties): KafkaConsumer[String, String] =
- new KafkaConsumer(consumerConfig)
-}
-
-private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int)
-
-private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
-
-private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[TopicPartition, Int],
- initialOffsets: collection.Map[TopicPartition, Long],
- expectedNumFetchers: Int,
- reportInterval: Long) extends Logging {
- private val fetchOffsetMap = new Pool[TopicPartition, Long]
- private val recordsCache = new Pool[TopicPartition, Pool[Int, FetchResponseData.PartitionData]]
- private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
- private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
- @volatile private var lastReportTime = Time.SYSTEM.milliseconds
- private var maxLag: Long = -1L
- private var offsetWithMaxLag: Long = -1L
- private var maxLagTopicAndPartition: TopicPartition = _
- initialize()
-
- def createNewFetcherBarrier(): Unit = {
- fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))
- }
-
- def getFetcherBarrier() = fetcherBarrier.get
-
- def createNewVerificationBarrier(): Unit = {
- verificationBarrier.set(new CountDownLatch(1))
- }
-
- def getVerificationBarrier() = verificationBarrier.get
-
- private def initialize(): Unit = {
- for (topicPartition <- expectedReplicasPerTopicPartition.keySet)
- recordsCache.put(topicPartition, new Pool[Int, FetchResponseData.PartitionData])
- setInitialOffsets()
- }
-
-
- private def setInitialOffsets(): Unit = {
- for ((tp, offset) <- initialOffsets)
- fetchOffsetMap.put(tp, offset)
- }
-
- def addFetchedData(topicAndPartition: TopicPartition, replicaId: Int, partitionData: FetchResponseData.PartitionData): Unit = {
- recordsCache.get(topicAndPartition).put(replicaId, partitionData)
- }
-
- def getOffset(topicAndPartition: TopicPartition) = {
- fetchOffsetMap.get(topicAndPartition)
- }
-
- def verifyCheckSum(println: String => Unit): Unit = {
- debug("Begin verification")
- maxLag = -1L
- for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
- debug(s"Verifying $topicPartition")
- assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
- "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected "
- + expectedReplicasPerTopicPartition(topicPartition) + " replicas")
- val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
- replicaId -> FetchResponse.recordsOrFail(fetchResponse).batches.iterator
- }
- val maxHw = fetchResponsePerReplica.values.map(_.highWatermark).max
-
- // Iterate one message at a time from every replica, until high watermark is reached.
- var isMessageInAllReplicas = true
- while (isMessageInAllReplicas) {
- var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
- for ((replicaId, recordBatchIterator) <- recordBatchIteratorMap) {
- try {
- if (recordBatchIterator.hasNext) {
- val batch = recordBatchIterator.next()
-
- // only verify up to the high watermark
- if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).highWatermark)
- isMessageInAllReplicas = false
- else {
- messageInfoFromFirstReplicaOpt match {
- case None =>
- messageInfoFromFirstReplicaOpt = Some(
- MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
- case Some(messageInfoFromFirstReplica) =>
- if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
- println(ReplicaVerificationTool.getCurrentTimeString() + ": partition " + topicPartition
- + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
- + messageInfoFromFirstReplica.offset + " doesn't match replica "
- + replicaId + "'s offset " + batch.lastOffset)
- Exit.exit(1)
- }
- if (messageInfoFromFirstReplica.checksum != batch.checksum)
- println(ReplicaVerificationTool.getCurrentTimeString() + ": partition "
- + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
- + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
- + "; replica " + replicaId + "'s checksum " + batch.checksum)
- }
- }
- } else
- isMessageInAllReplicas = false
- } catch {
- case t: Throwable =>
- throw new RuntimeException("Error in processing replica %d in partition %s at offset %d."
- .format(replicaId, topicPartition, fetchOffsetMap.get(topicPartition)), t)
- }
- }
- if (isMessageInAllReplicas) {
- val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
- fetchOffsetMap.put(topicPartition, nextOffset)
- debug(s"${expectedReplicasPerTopicPartition(topicPartition)} replicas match at offset " +
- s"$nextOffset for $topicPartition")
- }
- }
- if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
- offsetWithMaxLag = fetchOffsetMap.get(topicPartition)
- maxLag = maxHw - offsetWithMaxLag
- maxLagTopicAndPartition = topicPartition
- }
- fetchResponsePerReplica.clear()
- }
- val currentTimeMs = Time.SYSTEM.milliseconds
- if (currentTimeMs - lastReportTime > reportInterval) {
- println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
- + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
- + " among " + recordsCache.size + " partitions")
- lastReportTime = currentTimeMs
- }
- }
-}
-
-private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: Iterable[TopicPartition],
- topicIds: Map[String, Uuid], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
- fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties,
- fetcherId: Int)
- extends ShutdownableThread(name) with Logging {
-
- this.logIdent = logPrefix
-
- private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
- s"broker-${FetchRequest.DEBUGGING_CONSUMER_ID}-fetcher-$fetcherId")
-
- private val topicNames = topicIds.map(_.swap)
-
- override def doWork(): Unit = {
-
- val fetcherBarrier = replicaBuffer.getFetcherBarrier()
- val verificationBarrier = replicaBuffer.getVerificationBarrier()
-
- val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
- for (topicPartition <- topicPartitions)
- requestMap.put(topicPartition, new FetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition),
- 0L, fetchSize, Optional.empty()))
-
- val fetchRequestBuilder = FetchRequest.Builder.
- forReplica(ApiKeys.FETCH.latestVersion, FetchRequest.DEBUGGING_CONSUMER_ID, -1, maxWait, minBytes, requestMap)
-
- debug("Issuing fetch request ")
-
- var fetchResponse: FetchResponse = null
- try {
- val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
- fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
- } catch {
- case t: Throwable =>
- if (!isRunning)
- throw t
- }
-
- if (fetchResponse != null) {
- fetchResponse.responseData(topicNames.asJava, ApiKeys.FETCH.latestVersion()).forEach { (tp, partitionData) =>
- replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
- }
- } else {
- for (topicAndPartition <- topicPartitions)
- replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, FetchResponse.partitionResponse(topicAndPartition.partition, Errors.NONE))
- }
-
- fetcherBarrier.countDown()
- debug("Done fetching")
-
- // wait for all fetchers to finish
- fetcherBarrier.await()
- debug("Ready for verification")
-
- // one of the fetchers will do the verification
- if (doVerification) {
- debug("Do verification")
- replicaBuffer.verifyCheckSum(println)
- replicaBuffer.createNewFetcherBarrier()
- replicaBuffer.createNewVerificationBarrier()
- debug("Created new barrier")
- verificationBarrier.countDown()
- }
-
- verificationBarrier.await()
- debug("Done verification")
- }
-}
-
-private class ReplicaFetcherBlockingSend(sourceNode: Node,
- consumerConfig: ConsumerConfig,
- metrics: Metrics,
- time: Time,
- fetcherId: Int,
- clientId: String) {
-
- private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
-
- private val networkClient = {
- val logContext = new LogContext()
- val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig, time, logContext)
- val selector = new Selector(
- NetworkReceive.UNLIMITED,
- consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- metrics,
- time,
- "replica-fetcher",
- Map("broker-id" -> sourceNode.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
- false,
- channelBuilder,
- logContext
- )
- new NetworkClient(
- selector,
- new ManualMetadataUpdater(),
- clientId,
- 1,
- 0,
- 0,
- Selectable.USE_DEFAULT_BUFFER_SIZE,
- consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
- consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
- consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
- consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
- time,
- false,
- new ApiVersions,
- logContext
- )
- }
-
- def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
- try {
- if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
- throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
- else {
- val clientRequest = networkClient.newClientRequest(sourceNode.id.toString, requestBuilder,
- time.milliseconds(), true)
- NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
- }
- }
- catch {
- case e: Throwable =>
- networkClient.close(sourceNode.id.toString)
- throw e
- }
- }
-
- def close(): Unit = {
- networkClient.close()
- }
-}
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
deleted file mode 100644
index 217260403df..00000000000
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions.assertTrue
-
-class ReplicaVerificationToolTest {
-
- @Test
- def testReplicaBufferVerifyChecksum(): Unit = {
- val sb = new StringBuilder
-
- val expectedReplicasPerTopicAndPartition = Map(
- new TopicPartition("a", 0) -> 3,
- new TopicPartition("a", 1) -> 3,
- new TopicPartition("b", 0) -> 2
- )
-
- val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, 0)
- expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
- (0 until numReplicas).foreach { replicaId =>
- val records = (0 to 5).map { index =>
- new SimpleRecord(s"key $index".getBytes, s"value $index".getBytes)
- }
- val initialOffset = 4
- val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
- val partitionData = new FetchResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setHighWatermark(20)
- .setLastStableOffset(20)
- .setLogStartOffset(0)
- .setRecords(memoryRecords)
-
- replicaBuffer.addFetchedData(tp, replicaId, partitionData)
- }
- }
-
- replicaBuffer.verifyCheckSum(line => sb.append(s"$line\n"))
- val output = sb.toString.trim
-
- // If you change this assertion, you should verify that the replica_verification_test.py system test still passes
- assertTrue(output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"),
- s"Max lag information should be in output: `$output`")
- }
-
-}
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 13a1288001f..ecc47b2a6df 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -71,15 +71,14 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
return lag
def start_cmd(self, node):
- cmd = self.path.script("kafka-run-class.sh", node)
- cmd += " %s" % self.java_class_name()
- cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
+ cmd = self.path.script("kafka-replica-verification.sh", node)
+ cmd += " --broker-list %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"
return cmd
def stop_node(self, node):
- node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True,
+ node.account.kill_java_processes("ReplicaVerificationTool", clean_shutdown=True,
allow_fail=True)
stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
@@ -87,9 +86,6 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
(str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
- node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False,
+ node.account.kill_java_processes("ReplicaVerificationTool", clean_shutdown=False,
allow_fail=True)
node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
-
- def java_class_name(self):
- return "kafka.tools.ReplicaVerificationTool"
diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
new file mode 100644
index 00000000000..446c5fd67bf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.TopicFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * For verifying the consistency among replicas.
+ *
+ * 1. start a fetcher on every broker
+ * 2. each fetcher does the following
+ * 2.1 issues fetch request
+ * 2.2 puts the fetched result in a shared buffer
+ * 2.3 waits for all other fetchers to finish step 2.2
+ * 2.4 one of the fetchers verifies the consistency of fetched results among replicas
+ *
+ * The consistency verification is up to the high watermark. The tool reports the
+ * max lag between the verified offset and the high watermark among all partitions.
+ *
+ * If a broker goes down, the verification of the partitions on that broker is delayed
+ * until the broker is up again.
+ *
+ * Caveats:
+ * 1. The tool needs all brokers to be up at startup time.
+ * 2. The tool doesn't handle out of range offsets.
+ */
+public class ReplicaVerificationTool {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicaVerificationTool.class);
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+ public static void main(String[] args) {
+ try {
+ ReplicaVerificationToolOptions options = new ReplicaVerificationToolOptions(args);
+ // getting topic metadata
+ LOG.info("Getting topic metadata...");
+ String brokerList = options.brokerHostsAndPorts();
+
+ try (Admin adminClient = createAdminClient(brokerList)) {
+ Collection topicsMetadata = listTopicsMetadata(adminClient);
+ Map brokerInfo = brokerDetails(adminClient);
+
+ Map topicIds = topicsMetadata.stream().collect(Collectors.toMap(TopicDescription::name, TopicDescription::topicId));
+
+ List filteredTopicMetadata = topicsMetadata.stream().filter(
+ topicMetadata -> options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)
+ ).collect(Collectors.toList());
+
+ if (filteredTopicMetadata.isEmpty()) {
+ LOG.error("No topics found. {} if specified, is either filtering out all topics or there is no topic.", options.topicsIncludeOpt);
+ Exit.exit(1);
+ }
+
+ List topicPartitionReplicas = filteredTopicMetadata.stream().flatMap(
+ topicMetadata -> topicMetadata.partitions().stream().flatMap(
+ partitionMetadata -> partitionMetadata.replicas().stream().map(
+ node -> new TopicPartitionReplica(topicMetadata.name(), partitionMetadata.partition(), node.id())
+ )
+ )
+ ).collect(Collectors.toList());
+ LOG.debug("Selected topic partitions: {}", topicPartitionReplicas);
+
+ Map> brokerToTopicPartitions = topicPartitionReplicas.stream()
+ .collect(Collectors.groupingBy(
+ TopicPartitionReplica::brokerId,
+ Collectors.mapping(
+ replica -> new TopicPartition(replica.topic(), replica.partition()),
+ Collectors.toList()
+ )
+ ));
+ LOG.debug("Topic partitions per broker: {}", brokerToTopicPartitions);
+
+ Map expectedReplicasPerTopicPartition = topicPartitionReplicas.stream()
+ .collect(Collectors.groupingBy(
+ replica -> new TopicPartition(replica.topic(), replica.partition()),
+ Collectors.collectingAndThen(
+ Collectors.toList(),
+ List::size
+ )
+ ));
+ LOG.debug("Expected replicas per topic partition: {}", expectedReplicasPerTopicPartition);
+
+ List topicPartitions = filteredTopicMetadata.stream()
+ .flatMap(topicMetadata -> topicMetadata.partitions().stream()
+ .map(partitionMetadata -> new TopicPartition(topicMetadata.name(), partitionMetadata.partition()))
+ )
+ .collect(Collectors.toList());
+
+ Properties consumerProps = consumerConfig(brokerList);
+
+ ReplicaBuffer replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
+ initialOffsets(topicPartitions, consumerProps, options.initialOffsetTime()),
+ brokerToTopicPartitions.size(), options.reportInterval());
+
+ // create all replica fetcher threads
+ int verificationBrokerId = brokerToTopicPartitions.entrySet().iterator().next().getKey();
+ AtomicInteger counter = new AtomicInteger(0);
+ List fetcherThreads = brokerToTopicPartitions.entrySet().stream()
+ .map(entry -> {
+ int brokerId = entry.getKey();
+ Iterable partitions = entry.getValue();
+ return new ReplicaFetcher(
+ "ReplicaFetcher-" + brokerId,
+ brokerInfo.get(brokerId),
+ partitions,
+ topicIds,
+ replicaBuffer,
+ options.fetchSize(),
+ options.maxWaitMs(),
+ 1,
+ brokerId == verificationBrokerId,
+ consumerProps,
+ counter.incrementAndGet()
+ );
+ })
+ .collect(Collectors.toList());
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("Stopping all fetchers");
+ fetcherThreads.forEach(replicaFetcher -> {
+ try {
+ replicaFetcher.shutdown();
+ } catch (InterruptedException ignored) {
+ }
+ });
+ }, "ReplicaVerificationToolShutdownHook"));
+
+ fetcherThreads.forEach(Thread::start);
+ System.out.printf("%s: verification process is started%n",
+ DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())));
+ }
+ } catch (Throwable e) {
+ System.err.println(e.getMessage());
+ System.err.println(Utils.stackTrace(e));
+ Exit.exit(1);
+ }
+ }
+
+ private static Map initialOffsets(List topicPartitions, Properties consumerConfig, long initialOffsetTime) {
+ try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig)) {
+ if (ListOffsetsRequest.LATEST_TIMESTAMP == initialOffsetTime) {
+ return consumer.endOffsets(topicPartitions).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ } else if (ListOffsetsRequest.EARLIEST_TIMESTAMP == initialOffsetTime) {
+ return consumer.beginningOffsets(topicPartitions).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ } else {
+ Map timestampsToSearch = topicPartitions.stream()
+ .collect(Collectors.toMap(Function.identity(), tp -> initialOffsetTime));
+ return consumer.offsetsForTimes(timestampsToSearch).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+ }
+ }
+ }
+
+ private static Properties consumerConfig(String brokerUrl) {
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return properties;
+ }
+
+ private static Map brokerDetails(Admin adminClient) throws ExecutionException, InterruptedException {
+ return adminClient.describeCluster().nodes().get().stream().collect(Collectors.toMap(Node::id, Function.identity()));
+ }
+
+ private static Collection listTopicsMetadata(Admin adminClient) throws ExecutionException, InterruptedException {
+ Set topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
+ return adminClient.describeTopics(topics).allTopicNames().get().values();
+ }
+
+ private static Admin createAdminClient(String brokerList) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return Admin.create(props);
+ }
+
+ private static class ReplicaVerificationToolOptions extends CommandDefaultOptions {
+ private final OptionSpec brokerListOpt;
+ private final OptionSpec fetchSizeOpt;
+ private final OptionSpec maxWaitMsOpt;
+ private final OptionSpec topicWhiteListOpt;
+ private final OptionSpec topicsIncludeOpt;
+ private final OptionSpec initialOffsetTimeOpt;
+ private final OptionSpec reportIntervalOpt;
+
+ ReplicaVerificationToolOptions(String[] args) {
+ super(args);
+ brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
+ .withRequiredArg()
+ .describedAs("hostname:port,...,hostname:port")
+ .ofType(String.class);
+ fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
+ .withRequiredArg()
+ .describedAs("bytes")
+ .ofType(Integer.class)
+ .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES);
+ maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+ .withRequiredArg()
+ .describedAs("ms")
+ .ofType(Integer.class)
+ .defaultsTo(1_000);
+ topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; " +
+ "ignored if --topics-include specified. List of topics to verify replica consistency.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class)
+ .defaultsTo(".*");
+ topicsIncludeOpt = parser.accepts("topics-include", "List of topics to verify replica consistency.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class)
+ .defaultsTo(".*");
+ initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
+ .withRequiredArg()
+ .describedAs("timestamp/-1(latest)/-2(earliest)")
+ .ofType(Long.class)
+ .defaultsTo(-1L);
+ reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
+ .withRequiredArg()
+ .describedAs("ms")
+ .ofType(Long.class)
+ .defaultsTo(30_000L);
+ options = parser.parse(args);
+ if (args.length == 0 || options.has(helpOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas for a set of topics have the same data.");
+ }
+ if (options.has(versionOpt)) {
+ CommandLineUtils.printVersionAndExit();
+ }
+ CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt);
+ }
+
+ String brokerHostsAndPorts() {
+ String brokerList = options.valueOf(brokerListOpt);
+ validateBrokerList(parser, brokerList);
+ return brokerList;
+ }
+
+ void validateBrokerList(OptionParser parser, String brokerList) {
+ if (parser == null || brokerList == null) {
+ throw new RuntimeException("No option parser or broker list found");
+ }
+ if (brokerList.isEmpty()) {
+ CommandLineUtils.printUsageAndExit(parser, "Empty broker list option");
+ }
+
+ String[] hostPorts;
+ if (brokerList.contains(",")) hostPorts = brokerList.split(",");
+ else hostPorts = new String[]{brokerList};
+
+ String[] validHostPort = Arrays.stream(hostPorts)
+ .filter(hostPortData -> Utils.getPort(hostPortData) != null)
+ .toArray(String[]::new);
+
+ if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
+ CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option");
+ }
+ }
+
+ TopicFilter.IncludeList topicsIncludeFilter() {
+ String regex = options.valueOf(options.has(topicsIncludeOpt) ? topicsIncludeOpt : topicWhiteListOpt);
+ try {
+ Pattern.compile(regex);
+ } catch (PatternSyntaxException e) {
+ throw new RuntimeException(format("%s is an invalid regex", regex));
+ }
+ return new TopicFilter.IncludeList(regex);
+ }
+
+ int fetchSize() {
+ return options.valueOf(fetchSizeOpt);
+ }
+
+ int maxWaitMs() {
+ return options.valueOf(maxWaitMsOpt);
+ }
+
+ long initialOffsetTime() {
+ return options.valueOf(initialOffsetTimeOpt);
+ }
+
+ long reportInterval() {
+ return options.valueOf(reportIntervalOpt);
+ }
+ }
+
+ private static class MessageInfo {
+ final int replicaId;
+ final long offset;
+ final long nextOffset;
+ final long checksum;
+
+ MessageInfo(int replicaId, long offset, long nextOffset, long checksum) {
+ this.replicaId = replicaId;
+ this.offset = offset;
+ this.nextOffset = nextOffset;
+ this.checksum = checksum;
+ }
+ }
+
+ protected static class ReplicaBuffer {
+ private final Map expectedReplicasPerTopicPartition;
+ private final int expectedNumFetchers;
+ private final long reportInterval;
+ private final Map fetchOffsetMap;
+ private final Map> recordsCache;
+ private final AtomicReference fetcherBarrier;
+ private final AtomicReference verificationBarrier;
+
+ private volatile long lastReportTime;
+ private long maxLag;
+ private long offsetWithMaxLag;
+ private TopicPartition maxLagTopicAndPartition;
+
+ ReplicaBuffer(Map expectedReplicasPerTopicPartition,
+ Map initialOffsets,
+ int expectedNumFetchers,
+ long reportInterval) {
+ this.expectedReplicasPerTopicPartition = expectedReplicasPerTopicPartition;
+ this.expectedNumFetchers = expectedNumFetchers;
+ this.reportInterval = reportInterval;
+ this.fetchOffsetMap = new HashMap<>();
+ this.recordsCache = new HashMap<>();
+ this.fetcherBarrier = new AtomicReference<>(new CountDownLatch(expectedNumFetchers));
+ this.verificationBarrier = new AtomicReference<>(new CountDownLatch(1));
+ this.lastReportTime = Time.SYSTEM.milliseconds();
+ this.maxLag = -1L;
+ this.offsetWithMaxLag = -1L;
+
+ for (TopicPartition topicPartition : expectedReplicasPerTopicPartition.keySet()) {
+ recordsCache.put(topicPartition, new HashMap<>());
+ }
+ // set initial offsets
+ for (Map.Entry entry : initialOffsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ Long offset = entry.getValue();
+ fetchOffsetMap.put(tp, offset);
+ }
+ }
+
+ void createNewFetcherBarrier() {
+ fetcherBarrier.set(new CountDownLatch(expectedNumFetchers));
+ }
+
+ CountDownLatch getFetcherBarrier() {
+ return fetcherBarrier.get();
+ }
+
+ void createNewVerificationBarrier() {
+ verificationBarrier.set(new CountDownLatch(1));
+ }
+
+ CountDownLatch getVerificationBarrier() {
+ return verificationBarrier.get();
+ }
+
+ void addFetchedData(TopicPartition topicPartition,
+ int replicaId,
+ FetchResponseData.PartitionData partitionData) {
+ recordsCache.get(topicPartition).put(replicaId, partitionData);
+ }
+
+ long getOffset(TopicPartition topicPartition) {
+ return fetchOffsetMap.get(topicPartition);
+ }
+
+ void verifyCheckSum(Consumer println) {
+ LOG.debug("Begin verification");
+ maxLag = -1L;
+
+ for (Map.Entry> cacheEntry : recordsCache.entrySet()) {
+ TopicPartition topicPartition = cacheEntry.getKey();
+ Map fetchResponsePerReplica = cacheEntry.getValue();
+
+ LOG.debug("Verifying {}", topicPartition);
+ assert fetchResponsePerReplica.size() == expectedReplicasPerTopicPartition.get(topicPartition) :
+ "fetched " + fetchResponsePerReplica.size() + " replicas for " + topicPartition +
+ ", but expected " + expectedReplicasPerTopicPartition.get(topicPartition) + " replicas";
+
+ Map> recordBatchIteratorMap = new HashMap<>();
+ for (Map.Entry fetchResEntry : fetchResponsePerReplica.entrySet()) {
+ int replicaId = fetchResEntry.getKey();
+ FetchResponseData.PartitionData fetchResponse = fetchResEntry.getValue();
+ Iterator extends RecordBatch> recordIterator =
+ FetchResponse.recordsOrFail(fetchResponse).batches().iterator();
+ recordBatchIteratorMap.put(replicaId, recordIterator);
+ }
+
+ long maxHw = fetchResponsePerReplica.values().stream()
+ .mapToLong(FetchResponseData.PartitionData::highWatermark)
+ .max().orElse(-1L);
+
+ boolean isMessageInAllReplicas = true;
+
+ // iterate one message at a time from every replica, until high watermark is reached
+ while (isMessageInAllReplicas) {
+ Optional messageInfoFromFirstReplicaOpt = Optional.empty();
+
+ for (Map.Entry> batchEntry : recordBatchIteratorMap.entrySet()) {
+ int replicaId = batchEntry.getKey();
+ Iterator extends RecordBatch> recordBatchIterator = batchEntry.getValue();
+
+ try {
+ if (recordBatchIterator.hasNext()) {
+ RecordBatch batch = recordBatchIterator.next();
+
+ // only verify up to the high watermark
+ if (batch.lastOffset() >= fetchResponsePerReplica.get(replicaId).highWatermark()) {
+ isMessageInAllReplicas = false;
+ } else {
+ if (!messageInfoFromFirstReplicaOpt.isPresent()) {
+ messageInfoFromFirstReplicaOpt = Optional.of(
+ new MessageInfo(replicaId, batch.lastOffset(), batch.nextOffset(), batch.checksum())
+ );
+ } else {
+ MessageInfo messageInfoFromFirstReplica = messageInfoFromFirstReplicaOpt.get();
+
+ if (messageInfoFromFirstReplica.offset != batch.lastOffset()) {
+ println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) +
+ ": partition " + topicPartition +
+ ": replica " + messageInfoFromFirstReplica.replicaId +
+ "'s offset " + messageInfoFromFirstReplica.offset +
+ " doesn't match replica " + replicaId +
+ "'s offset " + batch.lastOffset());
+ Exit.exit(1);
+ }
+
+ if (messageInfoFromFirstReplica.checksum != batch.checksum())
+ println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) +
+ ": partition " + topicPartition +
+ " has unmatched checksum at offset " + batch.lastOffset() +
+ "; replica " + messageInfoFromFirstReplica.replicaId +
+ "'s checksum " + messageInfoFromFirstReplica.checksum +
+ "; replica " + replicaId + "'s checksum " + batch.checksum());
+ }
+ }
+ } else {
+ isMessageInAllReplicas = false;
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException("Error in processing replica " + replicaId +
+ " in partition " + topicPartition + " at offset " +
+ fetchOffsetMap.get(topicPartition), t);
+ }
+ }
+
+ if (isMessageInAllReplicas) {
+ long nextOffset = messageInfoFromFirstReplicaOpt.map(messageInfo -> messageInfo.nextOffset).orElse(-1L);
+ fetchOffsetMap.put(topicPartition, nextOffset);
+ LOG.debug("{} replicas match at offset {} for {}",
+ expectedReplicasPerTopicPartition.get(topicPartition), nextOffset, topicPartition);
+ }
+ }
+
+ if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
+ offsetWithMaxLag = fetchOffsetMap.get(topicPartition);
+ maxLag = maxHw - offsetWithMaxLag;
+ maxLagTopicAndPartition = topicPartition;
+ }
+
+ fetchResponsePerReplica.clear();
+ }
+
+ long currentTimeMs = Time.SYSTEM.milliseconds();
+ if (currentTimeMs - lastReportTime > reportInterval) {
+ println.accept(DATE_FORMAT.format(new Date(currentTimeMs)) +
+ ": max lag is " + maxLag + " for partition " +
+ maxLagTopicAndPartition + " at offset " + offsetWithMaxLag +
+ " among " + recordsCache.size() + " partitions");
+ lastReportTime = currentTimeMs;
+ }
+ }
+ }
+
+ private static class ReplicaFetcher extends ShutdownableThread {
+ private final Node sourceBroker;
+ private final Iterable topicPartitions;
+ private final Map topicIds;
+ private final ReplicaBuffer replicaBuffer;
+ private final int fetchSize;
+ private final int maxWait;
+ private final int minBytes;
+ private final boolean doVerification;
+ private final ReplicaFetcherBlockingSend fetchEndpoint;
+ private final Map topicNames;
+
+ public ReplicaFetcher(String name,
+ Node sourceBroker,
+ Iterable topicPartitions,
+ Map topicIds,
+ ReplicaBuffer replicaBuffer,
+ int fetchSize,
+ int maxWait,
+ int minBytes,
+ boolean doVerification,
+ Properties consumerConfig,
+ int fetcherId) {
+ super(name);
+ this.sourceBroker = sourceBroker;
+ this.topicPartitions = topicPartitions;
+ this.topicIds = topicIds;
+ this.replicaBuffer = replicaBuffer;
+ this.fetchSize = fetchSize;
+ this.maxWait = maxWait;
+ this.minBytes = minBytes;
+ this.doVerification = doVerification;
+ this.fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(),
+ Time.SYSTEM, fetcherId, "broker-" + FetchRequest.DEBUGGING_CONSUMER_ID + "-fetcher-" + fetcherId);
+ this.topicNames = topicIds.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+ }
+
+ @Override
+ public void doWork() {
+ CountDownLatch fetcherBarrier = replicaBuffer.getFetcherBarrier();
+ CountDownLatch verificationBarrier = replicaBuffer.getVerificationBarrier();
+
+ Map requestMap = new LinkedHashMap<>();
+ for (TopicPartition topicPartition : topicPartitions) {
+ requestMap.put(topicPartition, new FetchRequest.PartitionData(
+ topicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID),
+ replicaBuffer.getOffset(topicPartition),
+ 0L,
+ fetchSize,
+ Optional.empty()
+ ));
+ }
+
+ FetchRequest.Builder fetchRequestBuilder = FetchRequest.Builder.forReplica(
+ ApiKeys.FETCH.latestVersion(),
+ FetchRequest.DEBUGGING_CONSUMER_ID,
+ -1,
+ maxWait,
+ minBytes,
+ requestMap
+ );
+
+ LOG.debug("Issuing fetch request");
+
+ FetchResponse fetchResponse = null;
+ try {
+ ClientResponse clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder);
+ fetchResponse = (FetchResponse) clientResponse.responseBody();
+ } catch (Throwable t) {
+ if (!isRunning())
+ throw new RuntimeException(t);
+ }
+
+ if (fetchResponse != null) {
+ fetchResponse.responseData(topicNames, ApiKeys.FETCH.latestVersion()).forEach((tp, partitionData) ->
+ replicaBuffer.addFetchedData(tp, sourceBroker.id(), partitionData));
+ } else {
+ for (TopicPartition topicAndPartition : topicPartitions) {
+ replicaBuffer.addFetchedData(
+ topicAndPartition,
+ sourceBroker.id(),
+ FetchResponse.partitionResponse(topicAndPartition.partition(), Errors.NONE)
+ );
+ }
+ }
+
+ fetcherBarrier.countDown();
+ LOG.debug("Done fetching");
+
+ // wait for all fetchers to finish
+ try {
+ fetcherBarrier.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ LOG.debug("Ready for verification");
+
+ // one of the fetchers will do the verification
+ if (doVerification) {
+ LOG.debug("Do verification");
+ replicaBuffer.verifyCheckSum(System.out::println);
+ replicaBuffer.createNewFetcherBarrier();
+ replicaBuffer.createNewVerificationBarrier();
+ LOG.debug("Created new barrier");
+ verificationBarrier.countDown();
+ }
+
+ try {
+ verificationBarrier.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ LOG.debug("Done verification");
+ }
+ }
+
+ private static class ReplicaFetcherBlockingSend {
+ private final Node sourceNode;
+ private final Time time;
+ private final int socketTimeout;
+ private final NetworkClient networkClient;
+
+ ReplicaFetcherBlockingSend(Node sourceNode,
+ ConsumerConfig consumerConfig,
+ Metrics metrics,
+ Time time,
+ int fetcherId,
+ String clientId) {
+ this.sourceNode = sourceNode;
+ this.time = time;
+ this.socketTimeout = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+ LogContext logContext = new LogContext();
+ ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(consumerConfig, time, logContext);
+ Selector selector = new Selector(
+ NetworkReceive.UNLIMITED,
+ consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+ metrics,
+ time,
+ "replica-fetcher",
+ new HashMap() {{
+ put("broker-id", sourceNode.idString());
+ put("fetcher-id", String.valueOf(fetcherId));
+ }},
+ false,
+ channelBuilder,
+ logContext
+ );
+ this.networkClient = new NetworkClient(
+ selector,
+ new ManualMetadataUpdater(),
+ clientId,
+ 1,
+ 0,
+ 0,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
+ consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+ consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+ consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+ time,
+ false,
+ new ApiVersions(),
+ logContext
+ );
+ }
+
+ ClientResponse sendRequest(AbstractRequest.Builder extends AbstractRequest> requestBuilder) {
+ try {
+ if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+ throw new SocketTimeoutException("Failed to connect within " + socketTimeout + " ms");
+ else {
+ ClientRequest clientRequest = networkClient.newClientRequest(sourceNode.idString(),
+ requestBuilder, time.milliseconds(), true);
+ return NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time);
+ }
+ } catch (Throwable e) {
+ networkClient.close(sourceNode.idString());
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java
new file mode 100644
index 00000000000..b5eb121ba13
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReplicaVerificationToolTest {
+ @Test
+ void testReplicaBufferVerifyChecksum() {
+ StringBuilder sb = new StringBuilder();
+ final Map expectedReplicasPerTopicAndPartition = new HashMap() {{
+ put(new TopicPartition("a", 0), 3);
+ put(new TopicPartition("a", 1), 3);
+ put(new TopicPartition("b", 0), 2);
+ }};
+
+ ReplicaVerificationTool.ReplicaBuffer replicaBuffer =
+ new ReplicaVerificationTool.ReplicaBuffer(expectedReplicasPerTopicAndPartition, Collections.emptyMap(), 2, 0);
+ expectedReplicasPerTopicAndPartition.forEach((tp, numReplicas) -> {
+ IntStream.range(0, numReplicas).forEach(replicaId -> {
+ SimpleRecord[] records = IntStream.rangeClosed(0, 5)
+ .mapToObj(index -> new SimpleRecord(("key " + index).getBytes(), ("value " + index).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ long initialOffset = 4L;
+ MemoryRecords memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records);
+ FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition())
+ .setHighWatermark(20)
+ .setLastStableOffset(20)
+ .setLogStartOffset(0)
+ .setRecords(memoryRecords);
+
+ replicaBuffer.addFetchedData(tp, replicaId, partitionData);
+ });
+ });
+
+ replicaBuffer.verifyCheckSum(line -> sb.append(format("%s%n", line)));
+ String output = sb.toString().trim();
+
+ // if you change this assertion, you should verify that the replica_verification_test.py system test still passes
+ assertTrue(output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"),
+ format("Max lag information should be in output: %s", output));
+ }
+}