Browse Source

maintain HW correctly with only 1 replica in ISR; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-420

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1377166 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
31b9d124d6
  1. 6
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 2
      core/src/main/scala/kafka/server/KafkaApis.scala
  3. 4
      core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

6
core/src/main/scala/kafka/cluster/Partition.scala

@ -131,6 +131,8 @@ class Partition(val topic: String, @@ -131,6 +131,8 @@ class Partition(val topic: String,
leaderEpoch = leaderAndISR.leaderEpoch
zkVersion = leaderAndISR.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(getReplica().get)
}
/**
@ -210,7 +212,7 @@ class Partition(val topic: String, @@ -210,7 +212,7 @@ class Partition(val topic: String,
}
}
private def maybeIncrementLeaderHW(leaderReplica: Replica) {
def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min
val oldHighWatermark = leaderReplica.highWatermark
@ -232,6 +234,8 @@ class Partition(val topic: String, @@ -232,6 +234,8 @@ class Partition(val topic: String,
info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
updateISR(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
}
case None => // do nothing if no longer leader
}

2
core/src/main/scala/kafka/server/KafkaApis.scala

@ -184,6 +184,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -184,6 +184,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
val log = localReplica.log.get
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
// we may need to increment high watermark since ISR could be down to 1
localReplica.partition.maybeIncrementLeaderHW(localReplica)
offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace("%d bytes written to logs, nextAppendOffset = %d"

4
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

@ -110,6 +110,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with @@ -110,6 +110,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
val replica = servers.head.replicaManager.getReplica(topic, 0).get
assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
val request = new FetchRequestBuilder()
.correlationId(100)
.clientId("test-client")

Loading…
Cancel
Save