Browse Source

MINOR: Improve logging for alter log dirs (#6302)

This patch adds several new log messages to provide more information about errors during log dir movement and to make it clear when each partition movement is finished. 

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6350/head
Bob Barrett 6 years ago committed by Jason Gustafson
parent
commit
18b3a878a0
  1. 1
      core/src/main/scala/kafka/log/LogManager.scala
  2. 12
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  3. 3
      core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

1
core/src/main/scala/kafka/log/LogManager.scala

@ -796,6 +796,7 @@ class LogManager(logDirs: Seq[File], @@ -796,6 +796,7 @@ class LogManager(logDirs: Seq[File],
val sourceLog = currentLogs.get(topicPartition)
val destLog = futureLogs.get(topicPartition)
info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition")
if (sourceLog == null)
throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
if (destLog == null)

12
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -118,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, @@ -118,7 +118,7 @@ abstract class AbstractFetcherThread(name: String,
val fetchStates = partitionStates.partitionStateMap.asScala
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
handlePartitionsWithErrors(partitionsWithError)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
if (fetchRequestOpt.isEmpty) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
@ -134,8 +134,9 @@ abstract class AbstractFetcherThread(name: String, @@ -134,8 +134,9 @@ abstract class AbstractFetcherThread(name: String,
}
// deal with partitions with errors, potentially due to leadership changes
private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String) {
if (partitions.nonEmpty)
debug(s"Handling errors in $methodName for partitions $partitions")
delayPartitions(partitions, fetchBackOffMs)
}
@ -199,7 +200,7 @@ abstract class AbstractFetcherThread(name: String, @@ -199,7 +200,7 @@ abstract class AbstractFetcherThread(name: String,
}
val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets)
handlePartitionsWithErrors(partitionsWithError)
handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
}
@ -224,7 +225,7 @@ abstract class AbstractFetcherThread(name: String, @@ -224,7 +225,7 @@ abstract class AbstractFetcherThread(name: String,
}
}
handlePartitionsWithErrors(partitionsWithError)
handlePartitionsWithErrors(partitionsWithError, "truncateToHighWatermark")
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
@ -366,8 +367,7 @@ abstract class AbstractFetcherThread(name: String, @@ -366,8 +367,7 @@ abstract class AbstractFetcherThread(name: String,
}
if (partitionsWithError.nonEmpty) {
debug(s"Handling errors for partitions $partitionsWithError")
handlePartitionsWithErrors(partitionsWithError)
handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
}
}

3
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

@ -230,7 +230,8 @@ class ReplicaAlterLogDirsThread(name: String, @@ -230,7 +230,8 @@ class ReplicaAlterLogDirsThread(name: String,
requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset,
fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
case _: KafkaStorageException =>
case e: KafkaStorageException =>
debug(s"Failed to build fetch for $tp", e)
partitionsWithError += tp
}

Loading…
Cancel
Save