Browse Source

KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168)

Reviewers: Jun Rao <junrao@gmail.com>
pull/6202/head
Dhruvil Shah 6 years ago committed by Jun Rao
parent
commit
ef89cf4eb6
  1. 19
      core/src/main/scala/kafka/cluster/Partition.scala

19
core/src/main/scala/kafka/cluster/Partition.scala

@ -634,16 +634,25 @@ class Partition(val topicPartition: TopicPartition,
leaderReplicaIfLocal match { leaderReplicaIfLocal match {
case Some(leaderReplica) => case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if(outOfSyncReplicas.nonEmpty) { if (outOfSyncReplicas.nonEmpty) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty) assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
newInSyncReplicas.map(_.brokerId).mkString(","))) .format(inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(","),
leaderReplica.highWatermark.messageOffset,
leaderReplica.logEndOffset.messageOffset,
outOfSyncReplicas.map { replica =>
s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset.messageOffset})"
}.mkString(" ")
)
)
// update ISR in zk and in cache // update ISR in zk and in cache
updateIsr(newInSyncReplicas) updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
replicaManager.isrShrinkRate.mark() replicaManager.isrShrinkRate.mark()
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica) maybeIncrementLeaderHW(leaderReplica)
} else { } else {
false false

Loading…
Cancel
Save