diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 508dcd0e685..cae47f724e6 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -796,6 +796,7 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 7f7ed1f1a6e..959c2bfdd36 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -118,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, val fetchStates = partitionStates.partitionStateMap.asScala val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates) - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "maybeFetch") if (fetchRequestOpt.isEmpty) { trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") @@ -134,8 +134,9 @@ abstract class AbstractFetcherThread(name: String, } // deal with partitions with errors, potentially due to leadership changes - private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { + private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String) { if (partitions.nonEmpty) + debug(s"Handling errors in $methodName for partitions $partitions") delayPartitions(partitions, fetchBackOffMs) } @@ -199,7 +200,7 @@ abstract class AbstractFetcherThread(name: String, } val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets) - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } } @@ -224,7 +225,7 @@ abstract class AbstractFetcherThread(name: String, } } - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "truncateToHighWatermark") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } @@ -366,8 +367,7 @@ abstract class AbstractFetcherThread(name: String, } if (partitionsWithError.nonEmpty) { - debug(s"Handling errors for partitions $partitionsWithError") - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "processFetchRequest") } } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 8df234e65a3..7d67cb7d2ab 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -230,7 +230,8 @@ class ReplicaAlterLogDirsThread(name: String, requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } catch { - case _: KafkaStorageException => + case e: KafkaStorageException => + debug(s"Failed to build fetch for $tp", e) partitionsWithError += tp }