diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ca3abbbc973..e731111ab48 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -634,16 +634,25 @@ class Partition(val topicPartition: TopicPartition, leaderReplicaIfLocal match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) - if(outOfSyncReplicas.nonEmpty) { + if (outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) - info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), - newInSyncReplicas.map(_.brokerId).mkString(","))) + info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s." + .format(inSyncReplicas.map(_.brokerId).mkString(","), + newInSyncReplicas.map(_.brokerId).mkString(","), + leaderReplica.highWatermark.messageOffset, + leaderReplica.logEndOffset.messageOffset, + outOfSyncReplicas.map { replica => + s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset.messageOffset})" + }.mkString(" ") + ) + ) + // update ISR in zk and in cache updateIsr(newInSyncReplicas) - // we may need to increment high watermark since ISR could be down to 1 - replicaManager.isrShrinkRate.mark() + + // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) } else { false