From eb06b93f97808a7071df43621bd09a4f168864f8 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Thu, 11 Oct 2012 23:30:23 +0000 Subject: [PATCH] KAFKA-510 Broker needs to know replication factor per partition; patched by Yang Ye; reviewed by Neha Narkhede, Jun Rao and Joel Koshy git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1397372 13f79535-47bb-0310-9956-ffa450edef68 --- .../kafka/api/LeaderAndISRResponse.scala | 2 +- .../scala/kafka/api/LeaderAndIsrRequest.scala | 68 +++++++++++-------- .../main/scala/kafka/cluster/Partition.scala | 9 +-- .../main/scala/kafka/cluster/Replica.scala | 2 +- .../controller/ControllerChannelManager.scala | 14 ++-- .../controller/PartitionStateMachine.scala | 9 +-- .../controller/ReplicaStateMachine.scala | 14 ++-- .../main/scala/kafka/server/KafkaApis.scala | 18 +++-- .../scala/kafka/server/ReplicaManager.scala | 68 ++++++++++++------- .../RequestResponseSerializationTest.scala | 5 +- .../server/HighwatermarkPersistenceTest.scala | 6 +- .../unit/kafka/server/ISRExpirationTest.scala | 2 +- .../unit/kafka/server/SimpleFetchTest.scala | 2 +- 13 files changed, 129 insertions(+), 90 deletions(-) diff --git a/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala b/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala index 41ec8fed74e..33e0ecb6364 100644 --- a/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala +++ b/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala @@ -44,7 +44,7 @@ object LeaderAndISRResponse { case class LeaderAndISRResponse(versionId: Short, responseMap: Map[(String, Int), Short], errorCode: Short = ErrorMapping.NoError) - extends RequestOrResponse{ + extends RequestOrResponse { def sizeInBytes(): Int ={ var size = 2 + 2 + 4 for ((key, value) <- responseMap){ diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 26f2bd88834..fd8ef8a8686 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -27,38 +27,46 @@ import collection.mutable.HashMap object LeaderAndIsr { val initialLeaderEpoch: Int = 0 val initialZKVersion: Int = 0 - def readFrom(buffer: ByteBuffer): LeaderAndIsr = { +} + +case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) { + def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion) + + override def toString(): String = { + val jsonDataMap = new HashMap[String, String] + jsonDataMap.put("leader", leader.toString) + jsonDataMap.put("leaderEpoch", leaderEpoch.toString) + jsonDataMap.put("ISR", isr.mkString(",")) + Utils.stringMapToJsonString(jsonDataMap) + } +} + + +object PartitionStateInfo { + def readFrom(buffer: ByteBuffer): PartitionStateInfo = { val leader = buffer.getInt val leaderGenId = buffer.getInt val ISRString = Utils.readShortString(buffer, "UTF-8") val ISR = ISRString.split(",").map(_.toInt).toList val zkVersion = buffer.getInt - new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion) + val replicationFactor = buffer.getInt + PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor) } } -case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int){ - def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion) - +case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) { def writeTo(buffer: ByteBuffer) { - buffer.putInt(leader) - buffer.putInt(leaderEpoch) - Utils.writeShortString(buffer, isr.mkString(","), "UTF-8") - buffer.putInt(zkVersion) + buffer.putInt(leaderAndIsr.leader) + buffer.putInt(leaderAndIsr.leaderEpoch) + Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8") + buffer.putInt(leaderAndIsr.zkVersion) + buffer.putInt(replicationFactor) } def sizeInBytes(): Int = { - val size = 4 + 4 + (2 + isr.mkString(",").length) + 4 + val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4 size } - - override def toString(): String = { - val jsonDataMap = new HashMap[String, String] - jsonDataMap.put("leader", leader.toString) - jsonDataMap.put("leaderEpoch", leaderEpoch.toString) - jsonDataMap.put("ISR", isr.mkString(",")) - Utils.stringMapToJsonString(jsonDataMap) - } } @@ -73,17 +81,17 @@ object LeaderAndIsrRequest { val versionId = buffer.getShort val clientId = Utils.readShortString(buffer) val ackTimeoutMs = buffer.getInt - val leaderAndISRRequestCount = buffer.getInt - val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr] + val partitionStateInfosCount = buffer.getInt + val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo] - for(i <- 0 until leaderAndISRRequestCount){ + for(i <- 0 until partitionStateInfosCount){ val topic = Utils.readShortString(buffer, "UTF-8") val partition = buffer.getInt - val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer) + val partitionStateInfo = PartitionStateInfo.readFrom(buffer) - leaderAndISRInfos.put((topic, partition), leaderAndISRRequest) + partitionStateInfos.put((topic, partition), partitionStateInfo) } - new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos) + new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos) } } @@ -91,19 +99,19 @@ object LeaderAndIsrRequest { case class LeaderAndIsrRequest (versionId: Short, clientId: String, ackTimeoutMs: Int, - leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) + partitionStateInfos: Map[(String, Int), PartitionStateInfo]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { - this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos) + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = { + this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) - buffer.putInt(leaderAndISRInfos.size) - for((key, value) <- leaderAndISRInfos){ + buffer.putInt(partitionStateInfos.size) + for((key, value) <- partitionStateInfos){ Utils.writeShortString(buffer, key._1, "UTF-8") buffer.putInt(key._2) value.writeTo(buffer) @@ -112,7 +120,7 @@ case class LeaderAndIsrRequest (versionId: Short, def sizeInBytes(): Int = { var size = 1 + 2 + (2 + clientId.length) + 4 + 4 - for((key, value) <- leaderAndISRInfos) + for((key, value) <- partitionStateInfos) size += (2 + key._1.length) + 4 + value.sizeInBytes size } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3bd970e7349..fdabd6bf7e1 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -21,15 +21,17 @@ import kafka.utils._ import java.lang.Object import kafka.api.LeaderAndIsr import kafka.server.ReplicaManager -import kafka.common.ErrorMapping import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup +import kafka.common.ErrorMapping + /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ class Partition(val topic: String, val partitionId: Int, + var replicationFactor: Int, time: Time, val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { private val localBrokerId = replicaManager.config.brokerId @@ -57,8 +59,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - // TODO: need to pass in replication factor from controller - inSyncReplicas.size < replicaManager.config.defaultReplicationFactor + inSyncReplicas.size < replicationFactor } def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { @@ -292,7 +293,7 @@ class Partition(val topic: String, info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", "))) val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(), zkVersion) if (updateSucceeded){ inSyncReplicas = newISR zkVersion = newVersion diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 6d0e57e3e86..88241d476a6 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -67,7 +67,7 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: Long) { if (isLocal) { trace("Setting hw for replica %d topic %s partition %d on broker %d to %d" - .format(brokerId, topic, partitionId, newHighWatermark)) + .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) highWatermarkValue.set(newHighWatermark) } else throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition %d since it's not local" diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index fb6ea76ecd5..137a0970198 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -139,7 +139,7 @@ class RequestSendThread(val controllerId: Int, class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit) extends Logging { - val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] + val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] def newBatch() { @@ -151,10 +151,12 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques stopReplicaRequestMap.clear() } - def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) { + def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) { brokerIds.foreach { brokerId => - leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr]) - leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr) + leaderAndIsrRequestMap.getOrElseUpdate(brokerId, + new mutable.HashMap[(String, Int), PartitionStateInfo]) + leaderAndIsrRequestMap(brokerId).put((topic, partition), + PartitionStateInfo(leaderAndIsr, replicationFactor)) } } @@ -168,8 +170,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques def sendRequestsToBrokers() { leaderAndIsrRequestMap.foreach { m => val broker = m._1 - val leaderAndIsr = m._2 - val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr) + val partitionStateInfos = m._2 + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos) debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 59fbae3d942..41ef2d854a4 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -234,14 +234,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition, liveAssignedReplicas)) // make the first replica in the list of assigned replicas, the leader val leader = liveAssignedReplicas.head - var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList) + val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList) try { ZkUtils.createPersistentPath(controllerContext.zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString()) // NOTE: the above write can fail only if the current controller lost its zk session and the new controller // took over and initialized this partition. This can happen if the current controller went into a long // GC pause - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr, replicaAssignment.size) controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader) partitionState.put((topic, partition), OnlinePartition) }catch { @@ -283,7 +283,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader) info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) // store new leader and isr info in cache - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, + topic, partition, newLeaderAndIsr, controllerContext.partitionReplicaAssignment((topic, partition)).size) }catch { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." .format(topic, partition) + " Marking this partition offline", poe) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index fbef90dd14a..6e207fa7bc2 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) - private var isShuttingDown = new AtomicBoolean(false) + private val isShuttingDown = new AtomicBoolean(false) /** * Invoked on successful controller election. First registers a broker change listener since that triggers all @@ -101,6 +101,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { try { replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica) + val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition)) targetState match { case NewReplica => assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState) @@ -111,7 +112,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(leaderAndIsr.leader == replicaId) throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" .format(replicaId, topic, partition) + "state as it is being requested to become leader") - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + topic, partition, leaderAndIsr, replicaAssignment.size) case None => // new leader request will be sent to this replica when one gets elected } replicaState.put((topic, partition, replicaId), NewReplica) @@ -143,7 +145,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case Some(leaderAndIsr) => controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), + topic, partition, leaderAndIsr, replicaAssignment.size) replicaState.put((topic, partition, replicaId), OnlineReplica) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) case false => // ignore partitions whose leader is not alive @@ -167,7 +170,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded @@ -176,7 +179,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), + topic, partition, newLeaderAndIsr, replicaAssignment.size) // update the local leader and isr cache controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader) replicaState.put((topic, partition, replicaId), OfflineReplica) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 60d6d46a622..470c4ddc43b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,7 +17,6 @@ package kafka.server -import java.io.IOException import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.message._ @@ -26,7 +25,6 @@ import kafka.utils.{Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ import mutable.HashMap -import scala.math._ import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ @@ -34,6 +32,7 @@ import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient import kafka.common._ + /** * Logic to handle the various Kafka requests */ @@ -132,10 +131,14 @@ class KafkaApis(val requestChannel: RequestChannel, produceRequest.data.foreach(partitionAndData => maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2)) + val allPartitionHaveReplicationFactorOne = + !produceRequest.data.keySet.exists( + m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || - numPartitionsInError == produceRequest.numPartitions) { + allPartitionHaveReplicationFactorOne || + numPartitionsInError == produceRequest.numPartitions){ val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId, statuses) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) @@ -517,8 +520,13 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { - val partition = replicaManager.getOrCreatePartition(topic, partitionId) - val (hasEnough, errorCode) = partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) + val partitionOpt = replicaManager.getPartition(topic, partitionId) + val (hasEnough, errorCode) = partitionOpt match { + case Some(partition) => + partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) + case None => + (false, ErrorMapping.UnknownTopicOrPartitionCode) + } if (errorCode != ErrorMapping.NoError) { fetchPartitionStatus.acksPending = false fetchPartitionStatus.error = errorCode diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 515ba5a3788..d6211ccf7e2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,11 +22,12 @@ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ import kafka.log.LogManager -import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr} -import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit +import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} +import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest} + object ReplicaManager { val UnknownLogEndOffset = -1L @@ -39,7 +40,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient private val leaderPartitionsLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " - private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) @@ -69,6 +69,20 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) } + /** + * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest(). + * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory + */ + def getReplicationFactorForPartition(topic: String, partitionId: Int) = { + val partitionOpt = getPartition(topic, partitionId) + partitionOpt match { + case Some(partition) => + partition.replicationFactor + case None => + -1 + } + } + def startup() { // start ISR expiration thread kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) @@ -93,10 +107,10 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient errorCode } - def getOrCreatePartition(topic: String, partitionId: Int): Partition = { + def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = { var partition = allPartitions.get((topic, partitionId)) if (partition == null) { - allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this)) + allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this)) partition = allPartitions.get((topic, partitionId)) } partition @@ -125,10 +139,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } } - def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Replica = { - getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId) - } - def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { val partitionOpt = getPartition(topic, partitionId) partitionOpt match { @@ -141,23 +151,23 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient info("Handling leader and isr request %s".format(leaderAndISRRequest)) val responseMap = new collection.mutable.HashMap[(String, Int), Short] - for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ + for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){ var errorCode = ErrorMapping.NoError - val topic = partitionInfo._1 - val partitionId = partitionInfo._2 + val topic = topicAndPartition._1 + val partitionId = topicAndPartition._2 - val requestedLeaderId = leaderAndISR.leader + val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader try { if(requestedLeaderId == config.brokerId) - makeLeader(topic, partitionId, leaderAndISR) + makeLeader(topic, partitionId, partitionStateInfo) else - makeFollower(topic, partitionId, leaderAndISR) + makeFollower(topic, partitionId, partitionStateInfo) } catch { case e => error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } - responseMap.put(partitionInfo, errorCode) + responseMap.put(topicAndPartition, errorCode) } /** @@ -167,7 +177,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient */ // if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){ // startHighWaterMarksCheckPointThread -// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1) +// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1) // info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) // partitionsToRemove.foreach(p => stopReplica(p._1, p._2)) // } @@ -175,10 +185,11 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient responseMap } - private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = { + private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = { + val leaderAndIsr = partitionStateInfo.leaderAndIsr info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) - val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) { + val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -187,13 +198,14 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId)) } - private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) { - val leaderBrokerId: Int = leaderAndISR.leader + private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) { + val leaderAndIsr = partitionStateInfo.leaderAndIsr + val leaderBrokerId: Int = leaderAndIsr.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(leaderBrokerId, topic, partitionId)) - val partition = getOrCreatePartition(topic, partitionId) - if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) { + val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) + if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition @@ -209,8 +221,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { - val partition = getOrCreatePartition(topic, partitionId) - partition.updateLeaderHWAndMaybeExpandISR(replicaId, offset) + val partitionOpt = getPartition(topic, partitionId) + if(partitionOpt.isDefined){ + partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset) + } else { + warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + } } /** diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 12cebe677e7..0d97281c5aa 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -21,7 +21,6 @@ import org.junit._ import org.scalatest.junit.JUnitSuite import junit.framework.Assert._ import java.nio.ByteBuffer -import kafka.api._ import kafka.message.{Message, ByteBufferMessageSet} import kafka.cluster.Broker import collection.mutable._ @@ -83,8 +82,8 @@ object SerializationTestUtils{ def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = { val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1) val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) - val map = Map(((topic1, 0), leaderAndISR1), - ((topic2, 0), leaderAndISR2)) + val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)), + ((topic2, 0), PartitionStateInfo(leaderAndISR2, 3))) new LeaderAndIsrRequest(map) } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e347c427924..ca5fcb2539c 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -45,7 +45,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) assertEquals(0L, fooPartition0Hw) - val partition0 = replicaManager.getOrCreatePartition(topic, 0) + val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) // create leader log val log0 = getMockLog // create leader and follower replicas @@ -86,7 +86,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) assertEquals(0L, topic1Partition0Hw) - val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) // create leader log val topic1Log0 = getMockLog // create a local replica for topic1 @@ -102,7 +102,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark - val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0) + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) // create leader log val topic2Log0 = getMockLog // create a local replica for topic2 diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 0cc0695f93a..e8bf168c77c 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -81,7 +81,7 @@ class ISRExpirationTest extends JUnit3Suite { localLog: Log): Partition = { val leaderId=config.brokerId val replicaManager = new ReplicaManager(config, time, null, null, null) - val partition = replicaManager.getOrCreatePartition(topic, partitionId) + val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index c675f1fe71a..cfd8dd71346 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -237,7 +237,7 @@ class SimpleFetchTest extends JUnit3Suite { private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { - val partition = new Partition(topic, partitionId, time, replicaManager) + val partition = new Partition(topic, partitionId, 2, time, replicaManager) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica