Browse Source

KAFKA-510 Broker needs to know replication factor per partition; patched by Yang Ye; reviewed by Neha Narkhede, Jun Rao and Joel Koshy

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1397372 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Neha Narkhede 12 years ago
parent
commit
eb06b93f97
  1. 2
      core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
  2. 68
      core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
  3. 9
      core/src/main/scala/kafka/cluster/Partition.scala
  4. 2
      core/src/main/scala/kafka/cluster/Replica.scala
  5. 14
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  6. 9
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  7. 14
      core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  8. 18
      core/src/main/scala/kafka/server/KafkaApis.scala
  9. 68
      core/src/main/scala/kafka/server/ReplicaManager.scala
  10. 5
      core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
  11. 6
      core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
  12. 2
      core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
  13. 2
      core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

2
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala

@ -44,7 +44,7 @@ object LeaderAndISRResponse { @@ -44,7 +44,7 @@ object LeaderAndISRResponse {
case class LeaderAndISRResponse(versionId: Short,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse{
extends RequestOrResponse {
def sizeInBytes(): Int ={
var size = 2 + 2 + 4
for ((key, value) <- responseMap){

68
core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala

@ -27,38 +27,46 @@ import collection.mutable.HashMap @@ -27,38 +27,46 @@ import collection.mutable.HashMap
object LeaderAndIsr {
val initialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0
def readFrom(buffer: ByteBuffer): LeaderAndIsr = {
}
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
override def toString(): String = {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leader.toString)
jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
jsonDataMap.put("ISR", isr.mkString(","))
Utils.stringMapToJsonString(jsonDataMap)
}
}
object PartitionStateInfo {
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
val leader = buffer.getInt
val leaderGenId = buffer.getInt
val ISRString = Utils.readShortString(buffer, "UTF-8")
val ISR = ISRString.split(",").map(_.toInt).toList
val zkVersion = buffer.getInt
new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion)
val replicationFactor = buffer.getInt
PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor)
}
}
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int){
def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) {
def writeTo(buffer: ByteBuffer) {
buffer.putInt(leader)
buffer.putInt(leaderEpoch)
Utils.writeShortString(buffer, isr.mkString(","), "UTF-8")
buffer.putInt(zkVersion)
buffer.putInt(leaderAndIsr.leader)
buffer.putInt(leaderAndIsr.leaderEpoch)
Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8")
buffer.putInt(leaderAndIsr.zkVersion)
buffer.putInt(replicationFactor)
}
def sizeInBytes(): Int = {
val size = 4 + 4 + (2 + isr.mkString(",").length) + 4
val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
size
}
override def toString(): String = {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leader.toString)
jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
jsonDataMap.put("ISR", isr.mkString(","))
Utils.stringMapToJsonString(jsonDataMap)
}
}
@ -73,17 +81,17 @@ object LeaderAndIsrRequest { @@ -73,17 +81,17 @@ object LeaderAndIsrRequest {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val leaderAndISRRequestCount = buffer.getInt
val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
val partitionStateInfosCount = buffer.getInt
val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
for(i <- 0 until leaderAndISRRequestCount){
for(i <- 0 until partitionStateInfosCount){
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt
val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer)
val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
partitionStateInfos.put((topic, partition), partitionStateInfo)
}
new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos)
}
}
@ -91,19 +99,19 @@ object LeaderAndIsrRequest { @@ -91,19 +99,19 @@ object LeaderAndIsrRequest {
case class LeaderAndIsrRequest (versionId: Short,
clientId: String,
ackTimeoutMs: Int,
leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
partitionStateInfos: Map[(String, Int), PartitionStateInfo])
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(leaderAndISRInfos.size)
for((key, value) <- leaderAndISRInfos){
buffer.putInt(partitionStateInfos.size)
for((key, value) <- partitionStateInfos){
Utils.writeShortString(buffer, key._1, "UTF-8")
buffer.putInt(key._2)
value.writeTo(buffer)
@ -112,7 +120,7 @@ case class LeaderAndIsrRequest (versionId: Short, @@ -112,7 +120,7 @@ case class LeaderAndIsrRequest (versionId: Short,
def sizeInBytes(): Int = {
var size = 1 + 2 + (2 + clientId.length) + 4 + 4
for((key, value) <- leaderAndISRInfos)
for((key, value) <- partitionStateInfos)
size += (2 + key._1.length) + 4 + value.sizeInBytes
size
}

9
core/src/main/scala/kafka/cluster/Partition.scala

@ -21,15 +21,17 @@ import kafka.utils._ @@ -21,15 +21,17 @@ import kafka.utils._
import java.lang.Object
import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager
import kafka.common.ErrorMapping
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.common.ErrorMapping
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
*/
class Partition(val topic: String,
val partitionId: Int,
var replicationFactor: Int,
time: Time,
val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
@ -57,8 +59,7 @@ class Partition(val topic: String, @@ -57,8 +59,7 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
// TODO: need to pass in replication factor from controller
inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
inSyncReplicas.size < replicationFactor
}
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@ -292,7 +293,7 @@ class Partition(val topic: String, @@ -292,7 +293,7 @@ class Partition(val topic: String,
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", ")))
val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(), zkVersion)
if (updateSucceeded){
inSyncReplicas = newISR
zkVersion = newVersion

2
core/src/main/scala/kafka/cluster/Replica.scala

@ -67,7 +67,7 @@ class Replica(val brokerId: Int, @@ -67,7 +67,7 @@ class Replica(val brokerId: Int,
def highWatermark_=(newHighWatermark: Long) {
if (isLocal) {
trace("Setting hw for replica %d topic %s partition %d on broker %d to %d"
.format(brokerId, topic, partitionId, newHighWatermark))
.format(brokerId, topic, partitionId, brokerId, newHighWatermark))
highWatermarkValue.set(newHighWatermark)
} else
throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition %d since it's not local"

14
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -139,7 +139,7 @@ class RequestSendThread(val controllerId: Int, @@ -139,7 +139,7 @@ class RequestSendThread(val controllerId: Int,
class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
extends Logging {
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
def newBatch() {
@ -151,10 +151,12 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques @@ -151,10 +151,12 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
stopReplicaRequestMap.clear()
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) {
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
brokerIds.foreach { brokerId =>
leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr)
leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
new mutable.HashMap[(String, Int), PartitionStateInfo])
leaderAndIsrRequestMap(brokerId).put((topic, partition),
PartitionStateInfo(leaderAndIsr, replicationFactor))
}
}
@ -168,8 +170,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques @@ -168,8 +170,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
def sendRequestsToBrokers() {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val leaderAndIsr = m._2
val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
val partitionStateInfos = m._2
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos)
debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
sendRequest(broker, leaderAndIsrRequest, null)
}

9
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -234,14 +234,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -234,14 +234,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition, liveAssignedReplicas))
// make the first replica in the list of assigned replicas, the leader
val leader = liveAssignedReplicas.head
var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
try {
ZkUtils.createPersistentPath(controllerContext.zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString())
// NOTE: the above write can fail only if the current controller lost its zk session and the new controller
// took over and initialized this partition. This can happen if the current controller went into a long
// GC pause
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr, replicaAssignment.size)
controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
partitionState.put((topic, partition), OnlinePartition)
}catch {
@ -283,7 +283,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @@ -283,7 +283,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition,
topic, partition, newLeaderAndIsr, controllerContext.partitionReplicaAssignment((topic, partition)).size)
}catch {
case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
.format(topic, partition) + " Marking this partition offline", poe)

14
core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val zkClient = controllerContext.zkClient
var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
private var isShuttingDown = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
/**
* Invoked on successful controller election. First registers a broker change listener since that triggers all
@ -101,6 +101,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -101,6 +101,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
try {
replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
targetState match {
case NewReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
@ -111,7 +112,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -111,7 +112,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
if(leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
.format(replicaId, topic, partition) + "state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size)
case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put((topic, partition, replicaId), NewReplica)
@ -143,7 +145,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -143,7 +145,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case Some(leaderAndIsr) =>
controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
case true => // leader is alive
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
case false => // ignore partitions whose leader is not alive
@ -167,7 +170,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -167,7 +170,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
// update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
@ -176,7 +179,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { @@ -176,7 +179,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
// send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
topic, partition, newLeaderAndIsr, replicaAssignment.size)
// update the local leader and isr cache
controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
replicaState.put((topic, partition, replicaId), OfflineReplica)

18
core/src/main/scala/kafka/server/KafkaApis.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.server
import java.io.IOException
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
import kafka.message._
@ -26,7 +25,6 @@ import kafka.utils.{Pool, SystemTime, Logging} @@ -26,7 +25,6 @@ import kafka.utils.{Pool, SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
import kafka.network.RequestChannel.Response
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
@ -34,6 +32,7 @@ import kafka.metrics.KafkaMetricsGroup @@ -34,6 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
import org.I0Itec.zkclient.ZkClient
import kafka.common._
/**
* Logic to handle the various Kafka requests
*/
@ -132,10 +131,14 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -132,10 +131,14 @@ class KafkaApis(val requestChannel: RequestChannel,
produceRequest.data.foreach(partitionAndData =>
maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
val allPartitionHaveReplicationFactorOne =
!produceRequest.data.keySet.exists(
m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
if (produceRequest.requiredAcks == 0 ||
produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions) {
allPartitionHaveReplicationFactorOne ||
numPartitionsInError == produceRequest.numPartitions){
val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId, statuses)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@ -517,8 +520,13 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -517,8 +520,13 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
.format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
val partition = replicaManager.getOrCreatePartition(topic, partitionId)
val (hasEnough, errorCode) = partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
val partitionOpt = replicaManager.getPartition(topic, partitionId)
val (hasEnough, errorCode) = partitionOpt match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
case None =>
(false, ErrorMapping.UnknownTopicOrPartitionCode)
}
if (errorCode != ErrorMapping.NoError) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.error = errorCode

68
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -22,11 +22,12 @@ import org.I0Itec.zkclient.ZkClient @@ -22,11 +22,12 @@ import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils._
import kafka.log.LogManager
import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
object ReplicaManager {
val UnknownLogEndOffset = -1L
@ -39,7 +40,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -39,7 +40,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
private val leaderPartitionsLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
@ -69,6 +69,20 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -69,6 +69,20 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
}
/**
* This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest().
* In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory
*/
def getReplicationFactorForPartition(topic: String, partitionId: Int) = {
val partitionOpt = getPartition(topic, partitionId)
partitionOpt match {
case Some(partition) =>
partition.replicationFactor
case None =>
-1
}
}
def startup() {
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
@ -93,10 +107,10 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -93,10 +107,10 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
errorCode
}
def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
var partition = allPartitions.get((topic, partitionId))
if (partition == null) {
allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
partition = allPartitions.get((topic, partitionId))
}
partition
@ -125,10 +139,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -125,10 +139,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
}
}
def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Replica = {
getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId)
}
def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
val partitionOpt = getPartition(topic, partitionId)
partitionOpt match {
@ -141,23 +151,23 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -141,23 +151,23 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
info("Handling leader and isr request %s".format(leaderAndISRRequest))
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
var errorCode = ErrorMapping.NoError
val topic = partitionInfo._1
val partitionId = partitionInfo._2
val topic = topicAndPartition._1
val partitionId = topicAndPartition._2
val requestedLeaderId = leaderAndISR.leader
val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
try {
if(requestedLeaderId == config.brokerId)
makeLeader(topic, partitionId, leaderAndISR)
makeLeader(topic, partitionId, partitionStateInfo)
else
makeFollower(topic, partitionId, leaderAndISR)
makeFollower(topic, partitionId, partitionStateInfo)
} catch {
case e =>
error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
}
responseMap.put(partitionInfo, errorCode)
responseMap.put(topicAndPartition, errorCode)
}
/**
@ -167,7 +177,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -167,7 +177,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
*/
// if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
// startHighWaterMarksCheckPointThread
// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1)
// info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
// partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
// }
@ -175,10 +185,11 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -175,10 +185,11 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
responseMap
}
private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
val leaderAndIsr = partitionStateInfo.leaderAndIsr
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId)
if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) {
// also add this partition to the list of partitions for which the leader is the current broker
leaderPartitionsLock synchronized {
leaderPartitions += partition
@ -187,13 +198,14 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -187,13 +198,14 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
}
private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
val leaderBrokerId: Int = leaderAndISR.leader
private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) {
val leaderAndIsr = partitionStateInfo.leaderAndIsr
val leaderBrokerId: Int = leaderAndIsr.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(leaderBrokerId, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId)
if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) {
// remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
leaderPartitions -= partition
@ -209,8 +221,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient @@ -209,8 +221,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
}
def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
val partition = getOrCreatePartition(topic, partitionId)
partition.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
val partitionOpt = getPartition(topic, partitionId)
if(partitionOpt.isDefined){
partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
} else {
warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
}
}
/**

5
core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

@ -21,7 +21,6 @@ import org.junit._ @@ -21,7 +21,6 @@ import org.junit._
import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import java.nio.ByteBuffer
import kafka.api._
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
import collection.mutable._
@ -83,8 +82,8 @@ object SerializationTestUtils{ @@ -83,8 +82,8 @@ object SerializationTestUtils{
def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
val map = Map(((topic1, 0), leaderAndISR1),
((topic2, 0), leaderAndISR2))
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndISR2, 3)))
new LeaderAndIsrRequest(map)
}

6
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala

@ -45,7 +45,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -45,7 +45,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.checkpointHighWatermarks()
var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
// create leader log
val log0 = getMockLog
// create leader and follower replicas
@ -86,7 +86,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -86,7 +86,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.checkpointHighWatermarks()
var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
// create leader log
val topic1Log0 = getMockLog
// create a local replica for topic1
@ -102,7 +102,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -102,7 +102,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
// create leader log
val topic2Log0 = getMockLog
// create a local replica for topic2

2
core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala

@ -81,7 +81,7 @@ class ISRExpirationTest extends JUnit3Suite { @@ -81,7 +81,7 @@ class ISRExpirationTest extends JUnit3Suite {
localLog: Log): Partition = {
val leaderId=config.brokerId
val replicaManager = new ReplicaManager(config, time, null, null, null)
val partition = replicaManager.getOrCreatePartition(topic, partitionId)
val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica

2
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

@ -237,7 +237,7 @@ class SimpleFetchTest extends JUnit3Suite { @@ -237,7 +237,7 @@ class SimpleFetchTest extends JUnit3Suite {
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = {
val partition = new Partition(topic, partitionId, time, replicaManager)
val partition = new Partition(topic, partitionId, 2, time, replicaManager)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica

Loading…
Cancel
Save