From 22e4341ba9f7744314ae0363529333a276bd325d Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Mon, 10 Sep 2012 17:09:52 +0000 Subject: [PATCH] KAFKA-498: Controller has race conditions and synchronization bugs; patched by Neha Narkhede; reviewed by Jun Rao git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1382988 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/admin/AdminUtils.scala | 14 +- .../scala/kafka/api/LeaderAndISRRequest.scala | 121 ---- .../scala/kafka/api/LeaderAndIsrRequest.scala | 121 ++++ .../src/main/scala/kafka/cluster/Broker.scala | 4 +- .../main/scala/kafka/cluster/Partition.scala | 88 +-- .../common/BrokerNotAvailableException.scala | 22 + .../common/BrokerNotExistException.scala | 22 - .../scala/kafka/common/ErrorMapping.scala | 4 +- .../common/PartitionOfflineException.scala | 26 + .../consumer/ZookeeperConsumerConnector.scala | 29 +- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../scala/kafka/server/KafkaController.scala | 595 ++++++++++-------- .../scala/kafka/server/ReplicaManager.scala | 10 +- .../scala/kafka/utils/UpdateOffsetsInZK.scala | 21 +- core/src/main/scala/kafka/utils/Utils.scala | 1 - core/src/main/scala/kafka/utils/ZkUtils.scala | 248 ++++---- .../network/RpcDataSerializationTest.scala | 10 +- .../kafka/server/RequestPurgatoryTest.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 12 +- 19 files changed, 731 insertions(+), 621 deletions(-) create mode 100644 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala create mode 100644 core/src/main/scala/kafka/common/BrokerNotAvailableException.scala create mode 100644 core/src/main/scala/kafka/common/PartitionOfflineException.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 28ca1d7d130..e2367684c03 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -24,7 +24,7 @@ import kafka.utils.{Logging, Utils, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection.mutable -import kafka.common.{LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping} +import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping} object AdminUtils extends Logging { val rand = new Random @@ -148,13 +148,11 @@ object AdminUtils extends Logging { optionalBrokerInfo match { case Some(brokerInfo) => brokerInfo // return broker info from the cache case None => // fetch it from zookeeper - try { - val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head - cachedBrokerInfo += (id -> brokerInfo) - brokerInfo - }catch { - case e => error("Failed to fetch broker info for broker id " + id) - throw e + ZkUtils.getBrokerInfo(zkClient, id) match { + case Some(brokerInfo) => + cachedBrokerInfo += (id -> brokerInfo) + brokerInfo + case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id) } } } diff --git a/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala b/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala index 4c42e01b89a..e69de29bb2d 100644 --- a/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package kafka.api - -import java.nio._ -import kafka.utils._ -import collection.mutable.Map -import collection.mutable.HashMap - - -object LeaderAndISR { - val initialLeaderEpoch: Int = 0 - val initialZKVersion: Int = 0 - def readFrom(buffer: ByteBuffer): LeaderAndISR = { - val leader = buffer.getInt - val leaderGenId = buffer.getInt - val ISRString = Utils.readShortString(buffer, "UTF-8") - val ISR = ISRString.split(",").map(_.toInt).toList - val zkVersion = buffer.getInt - new LeaderAndISR(leader, leaderGenId, ISR, zkVersion) - } -} - -case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){ - def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion) - - def writeTo(buffer: ByteBuffer) { - buffer.putInt(leader) - buffer.putInt(leaderEpoch) - Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8") - buffer.putInt(zkVersion) - } - - def sizeInBytes(): Int = { - val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4 - size - } - - override def toString(): String = { - val jsonDataMap = new HashMap[String, String] - jsonDataMap.put("leader", leader.toString) - jsonDataMap.put("leaderEpoch", leaderEpoch.toString) - jsonDataMap.put("ISR", ISR.mkString(",")) - Utils.stringMapToJsonString(jsonDataMap) - } -} - - -object LeaderAndISRRequest { - val CurrentVersion = 1.shortValue() - val DefaultClientId = "" - val IsInit: Boolean = true - val NotInit: Boolean = false - val DefaultAckTimeout: Int = 1000 - - def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = { - val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) - val isInit = if(buffer.get() == 1.toByte) true else false - val ackTimeoutMs = buffer.getInt - val leaderAndISRRequestCount = buffer.getInt - val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR] - - for(i <- 0 until leaderAndISRRequestCount){ - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt - val leaderAndISRRequest = LeaderAndISR.readFrom(buffer) - - leaderAndISRInfos.put((topic, partition), leaderAndISRRequest) - } - new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos) - } -} - - -case class LeaderAndISRRequest (versionId: Short, - clientId: String, - isInit: Boolean, - ackTimeoutMs: Int, - leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) - extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { - def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = { - this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos) - } - - def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) - buffer.put(if(isInit) 1.toByte else 0.toByte) - buffer.putInt(ackTimeoutMs) - buffer.putInt(leaderAndISRInfos.size) - for((key, value) <- leaderAndISRInfos){ - Utils.writeShortString(buffer, key._1, "UTF-8") - buffer.putInt(key._2) - value.writeTo(buffer) - } - } - - def sizeInBytes(): Int = { - var size = 1 + 2 + (2 + clientId.length) + 4 + 4 - for((key, value) <- leaderAndISRInfos) - size += (2 + key._1.length) + 4 + value.sizeInBytes - size - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala new file mode 100644 index 00000000000..edad0df15d4 --- /dev/null +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package kafka.api + +import java.nio._ +import kafka.utils._ +import collection.mutable.Map +import collection.mutable.HashMap + + +object LeaderAndIsr { + val initialLeaderEpoch: Int = 0 + val initialZKVersion: Int = 0 + def readFrom(buffer: ByteBuffer): LeaderAndIsr = { + val leader = buffer.getInt + val leaderGenId = buffer.getInt + val ISRString = Utils.readShortString(buffer, "UTF-8") + val ISR = ISRString.split(",").map(_.toInt).toList + val zkVersion = buffer.getInt + new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion) + } +} + +case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int){ + def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion) + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(leader) + buffer.putInt(leaderEpoch) + Utils.writeShortString(buffer, isr.mkString(","), "UTF-8") + buffer.putInt(zkVersion) + } + + def sizeInBytes(): Int = { + val size = 4 + 4 + (2 + isr.mkString(",").length) + 4 + size + } + + override def toString(): String = { + val jsonDataMap = new HashMap[String, String] + jsonDataMap.put("leader", leader.toString) + jsonDataMap.put("leaderEpoch", leaderEpoch.toString) + jsonDataMap.put("ISR", isr.mkString(",")) + Utils.stringMapToJsonString(jsonDataMap) + } +} + + +object LeaderAndIsrRequest { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + val IsInit: Boolean = true + val NotInit: Boolean = false + val DefaultAckTimeout: Int = 1000 + + def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = { + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer) + val isInit = if(buffer.get() == 1.toByte) true else false + val ackTimeoutMs = buffer.getInt + val leaderAndISRRequestCount = buffer.getInt + val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr] + + for(i <- 0 until leaderAndISRRequestCount){ + val topic = Utils.readShortString(buffer, "UTF-8") + val partition = buffer.getInt + val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer) + + leaderAndISRInfos.put((topic, partition), leaderAndISRRequest) + } + new LeaderAndIsrRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos) + } +} + + +case class LeaderAndIsrRequest (versionId: Short, + clientId: String, + isInit: Boolean, + ackTimeoutMs: Int, + leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) + extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { + def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { + this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos) + } + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId) + buffer.put(if(isInit) 1.toByte else 0.toByte) + buffer.putInt(ackTimeoutMs) + buffer.putInt(leaderAndISRInfos.size) + for((key, value) <- leaderAndISRInfos){ + Utils.writeShortString(buffer, key._1, "UTF-8") + buffer.putInt(key._2) + value.writeTo(buffer) + } + } + + def sizeInBytes(): Int = { + var size = 1 + 2 + (2 + clientId.length) + 4 + 4 + for((key, value) <- leaderAndISRInfos) + size += (2 + key._1.length) + 4 + value.sizeInBytes + size + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index b42dbdfdad5..9b57b420092 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -19,7 +19,7 @@ package kafka.cluster import kafka.utils.Utils._ import java.nio.ByteBuffer -import kafka.common.BrokerNotExistException +import kafka.common.BrokerNotAvailableException /** * A Kafka broker @@ -28,7 +28,7 @@ private[kafka] object Broker { def createBroker(id: Int, brokerInfoString: String): Broker = { if(brokerInfoString == null) - throw new BrokerNotExistException("Broker id %s does not exist".format(id)) + throw new BrokerNotAvailableException("Broker id %s does not exist".format(id)) val brokerInfo = brokerInfoString.split(":") new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5887694fe04..22e95ce0fdd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -19,7 +19,7 @@ package kafka.cluster import scala.collection._ import kafka.utils._ import java.lang.Object -import kafka.api.LeaderAndISR +import kafka.api.LeaderAndIsr import kafka.server.ReplicaManager import kafka.common.ErrorMapping @@ -39,8 +39,8 @@ class Partition(val topic: String, var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] private val leaderISRUpdateLock = new Object - private var zkVersion: Int = LeaderAndISR.initialZKVersion - private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1 + private var zkVersion: Int = LeaderAndIsr.initialZKVersion + private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -53,7 +53,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val log = logManager.getOrCreateLog(topic, partitionId) val localReplica = new Replica(replicaId, this, time, - highwaterMarkCheckpoint.read(topic, partitionId), Some(log)) + highwaterMarkCheckpoint.read(topic, partitionId), Some(log)) addReplicaIfNotExists(localReplica) } else { @@ -97,7 +97,7 @@ class Partition(val topic: String, /** * If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader. */ - def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = { + def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = { leaderISRUpdateLock synchronized { if (leaderEpoch >= leaderAndISR.leaderEpoch){ info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request" @@ -119,20 +119,20 @@ class Partition(val topic: String, * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) { - trace("Started to become leader at the request %s".format(leaderAndISR.toString())) - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(topic, partitionId) + private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) { + trace("Started to become leader at the request %s".format(leaderAndISR.toString())) + // stop replica fetcher thread, if any + replicaFetcherManager.removeFetcher(topic, partitionId) - val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet - // reset LogEndOffset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) - inSyncReplicas = newInSyncReplicas - leaderEpoch = leaderAndISR.leaderEpoch - zkVersion = leaderAndISR.zkVersion - leaderReplicaIdOpt = Some(localBrokerId) - // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(getReplica().get) + val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet + // reset LogEndOffset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) + inSyncReplicas = newInSyncReplicas + leaderEpoch = leaderAndISR.leaderEpoch + zkVersion = leaderAndISR.zkVersion + leaderReplicaIdOpt = Some(localBrokerId) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(getReplica().get) } /** @@ -141,24 +141,28 @@ class Partition(val topic: String, * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { - trace("Started to become follower at the request %s".format(leaderAndISR.toString())) - val newLeaderBrokerId: Int = leaderAndISR.leader - info("Starting the follower state transition to follow leader %d for topic %s partition %d" - .format(newLeaderBrokerId, topic, partitionId)) - val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(topic, partitionId) - - // make sure local replica exists - val localReplica = getOrCreateReplica() - localReplica.log.get.truncateTo(localReplica.highWatermark) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndISR.leaderEpoch - zkVersion = leaderAndISR.zkVersion - leaderReplicaIdOpt = Some(newLeaderBrokerId) - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = { + trace("Started to become follower at the request %s".format(leaderAndISR.toString())) + val newLeaderBrokerId: Int = leaderAndISR.leader + info("Starting the follower state transition to follow leader %d for topic %s partition %d" + .format(newLeaderBrokerId, topic, partitionId)) + ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match { + case Some(leaderBroker) => + // stop fetcher thread to previous leader + replicaFetcherManager.removeFetcher(topic, partitionId) + // make sure local replica exists + val localReplica = getOrCreateReplica() + localReplica.log.get.truncateTo(localReplica.highWatermark) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndISR.leaderEpoch + zkVersion = leaderAndISR.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + // start fetcher thread to current leader + replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + case None => // leader went down + warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) + + " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId)) + } } def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) { @@ -197,7 +201,7 @@ class Partition(val topic: String, }) trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) || - (requiredAcks > 0 && numAcks >= requiredAcks)) { + (requiredAcks > 0 && numAcks >= requiredAcks)) { /* * requiredAcks < 0 means acknowledge after all replicas in ISR * are fully caught up to the (local) leader's offset @@ -211,7 +215,7 @@ class Partition(val topic: String, } } } - + def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min @@ -220,7 +224,7 @@ class Partition(val topic: String, leaderReplica.highWatermark = newHighWatermark else debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s" - .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) + .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } def maybeShrinkISR(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) { @@ -249,7 +253,7 @@ class Partition(val topic: String, * for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the * follower is not catching up and should be removed from the ISR - **/ + **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above @@ -266,9 +270,9 @@ class Partition(val topic: String, private def updateISR(newISR: Set[Replica]) { info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) - val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) + val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion) if (updateSucceeded){ inSyncReplicas = newISR zkVersion = newVersion diff --git a/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala b/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala new file mode 100644 index 00000000000..611bed6f318 --- /dev/null +++ b/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class BrokerNotAvailableException(message: String) extends RuntimeException(message) { + def this() = this(null) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/BrokerNotExistException.scala b/core/src/main/scala/kafka/common/BrokerNotExistException.scala index 8483dffc50f..e69de29bb2d 100644 --- a/core/src/main/scala/kafka/common/BrokerNotExistException.scala +++ b/core/src/main/scala/kafka/common/BrokerNotExistException.scala @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -class BrokerNotExistException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 54312c0fa6b..d39cf026632 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -38,7 +38,7 @@ object ErrorMapping { val LeaderNotAvailableCode : Short = 6 val NotLeaderForPartitionCode : Short = 7 val RequestTimedOutCode: Short = 8 - val BrokerNotExistInZookeeperCode: Short = 9 + val BrokerNotAvailableCode: Short = 9 val ReplicaNotAvailableCode: Short = 10 private val exceptionToCode = @@ -51,7 +51,7 @@ object ErrorMapping { classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode, classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode, classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, - classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode, + classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode, classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode ).withDefaultValue(UnknownCode) diff --git a/core/src/main/scala/kafka/common/PartitionOfflineException.scala b/core/src/main/scala/kafka/common/PartitionOfflineException.scala new file mode 100644 index 00000000000..ab7a095caae --- /dev/null +++ b/core/src/main/scala/kafka/common/PartitionOfflineException.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * This exception is thrown by the leader elector in the controller when leader election fails for a partition since + * all the replicas for a partition are offline + */ +class PartitionOfflineException(message: String) extends RuntimeException(message) { + def this() = this(null) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 99a795030db..955590352ab 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -292,21 +292,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, return partitionInfo.getConsumeOffset } - //otherwise, try to get it from zookeeper + // otherwise, try to get it from zookeeper try { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) val znode = topicDirs.consumerOffsetDir + "/" + partitionId val offsetString = readDataMaybeNull(zkClient, znode)._1 - if (offsetString != null) - return offsetString.toLong - else - return -1 + offsetString match { + case Some(offset) => offset.toLong + case None => -1L + } } catch { case e => error("error in getConsumedOffset JMX ", e) + -2L } - return -2 } def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = @@ -649,18 +649,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val znode = topicDirs.consumerOffsetDir + "/" + partition val offsetString = readDataMaybeNull(zkClient, znode)._1 // If first time starting a consumer, set the initial offset based on the config - var offset : Long = 0L - if (offsetString == null) - offset = config.autoOffsetReset match { + val offset = + offsetString match { + case Some(offsetStr) => offsetStr.toLong + case None => + config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => - earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime) + earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime) case OffsetRequest.LargestTimeString => - earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime) + earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime) case _ => - throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") + throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") + } } - else - offset = offsetString.toLong val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2c1db31d2ca..5cebb196405 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -66,7 +66,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleLeaderAndISRRequest(request: RequestChannel.Request){ - val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) + val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) diff --git a/core/src/main/scala/kafka/server/KafkaController.scala b/core/src/main/scala/kafka/server/KafkaController.scala index 99d002cfe09..6bd8ff251e1 100644 --- a/core/src/main/scala/kafka/server/KafkaController.scala +++ b/core/src/main/scala/kafka/server/KafkaController.scala @@ -22,20 +22,21 @@ import collection.immutable.Set import kafka.cluster.Broker import kafka.api._ import kafka.network.{Receive, BlockingChannel} -import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener} import org.apache.zookeeper.Watcher.Event.KeeperState import collection.JavaConversions._ import kafka.utils.{ShutdownableThread, ZkUtils, Logging} import java.lang.Object +import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} +import kafka.common.{KafkaException, PartitionOfflineException} class RequestSendThread(val controllerId: Int, val toBrokerId: Int, val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], val channel: BlockingChannel) - extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)){ + extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) { private val lock = new Object() override def doWork(): Unit = { @@ -44,163 +45,157 @@ class RequestSendThread(val controllerId: Int, val callback = queueItem._2 var receive: Receive = null - - try{ - lock synchronized { - channel.send(request) - receive = channel.receive() - var response: RequestOrResponse = null - request.requestId.get match { - case RequestKeys.LeaderAndISRRequest => - response = LeaderAndISRResponse.readFrom(receive.buffer) - case RequestKeys.StopReplicaRequest => - response = StopReplicaResponse.readFrom(receive.buffer) - } - trace("got a response %s".format(controllerId, response, toBrokerId)) - if(callback != null){ - callback(response) - } - } - } catch { - case e => - // log it and let it go. Let controller shut it down. - debug("Exception occurs", e) + try{ + lock synchronized { + channel.send(request) + receive = channel.receive() + var response: RequestOrResponse = null + request.requestId.get match { + case RequestKeys.LeaderAndISRRequest => + response = LeaderAndISRResponse.readFrom(receive.buffer) + case RequestKeys.StopReplicaRequest => + response = StopReplicaResponse.readFrom(receive.buffer) + } + trace("got a response %s".format(controllerId, response, toBrokerId)) + + if(callback != null){ + callback(response) } } - + } catch { + case e => + // log it and let it go. Let controller shut it down. + debug("Exception occurs", e) + } + } } -class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{ - private val brokers = new HashMap[Int, Broker] - private val messageChannels = new HashMap[Int, BlockingChannel] - private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]] - private val messageThreads = new HashMap[Int, RequestSendThread] - private val lock = new Object +class ControllerChannelManager private (config: KafkaConfig) extends Logging { + private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "], " - for(broker <- allBrokers){ - brokers.put(broker.id, broker) - info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port) - val channel = new BlockingChannel(broker.host, broker.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - config.controllerSocketTimeoutMs) - channel.connect() - messageChannels.put(broker.id, channel) - messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)) + + def this(allBrokers: Set[Broker], config : KafkaConfig) { + this(config) + allBrokers.foreach(addNewBroker(_)) } def startup() = { - for((brokerId, broker) <- brokers){ - val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId)) - thread.setDaemon(false) - thread.start() - messageThreads.put(broker.id, thread) + brokerLock synchronized { + brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) } } def shutdown() = { - lock synchronized { - for((brokerId, broker) <- brokers){ - removeBroker(brokerId) - } + brokerLock synchronized { + brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1)) } } - def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){ - messageQueues(brokerId).put((request, callback)) + def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { + brokerLock synchronized { + brokerStateInfo(brokerId).messageQueue.put((request, callback)) + } } - def addBroker(broker: Broker){ - lock synchronized { - brokers.put(broker.id, broker) - messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)) - val channel = new BlockingChannel(broker.host, broker.port, - BlockingChannel.UseDefaultBufferSize, - BlockingChannel.UseDefaultBufferSize, - config.controllerSocketTimeoutMs) - channel.connect() - messageChannels.put(broker.id, channel) - val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id)) - thread.setDaemon(false) - thread.start() - messageThreads.put(broker.id, thread) + def addBroker(broker: Broker) { + brokerLock synchronized { + addNewBroker(broker) + startRequestSendThread(broker.id) } } - def removeBroker(brokerId: Int){ - lock synchronized { - brokers.remove(brokerId) - try { - messageChannels(brokerId).disconnect() - messageChannels.remove(brokerId) - messageQueues.remove(brokerId) - messageThreads(brokerId).shutdown() - messageThreads.remove(brokerId) - }catch { - case e => error("Error while removing broker by the controller", e) - } + def removeBroker(brokerId: Int) { + brokerLock synchronized { + removeExistingBroker(brokerId) + } + } + + private def addNewBroker(broker: Broker) { + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) + val channel = new BlockingChannel(broker.host, broker.port, + BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, + config.controllerSocketTimeoutMs) + channel.connect() + val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel) + requestThread.setDaemon(false) + brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread)) + } + + private def removeExistingBroker(brokerId: Int) { + try { + brokerStateInfo(brokerId).channel.disconnect() + brokerStateInfo(brokerId).requestSendThread.shutdown() + brokerStateInfo.remove(brokerId) + }catch { + case e => error("Error while removing broker by the controller", e) } } + + private def startRequestSendThread(brokerId: Int) { + brokerStateInfo(brokerId).requestSendThread.start() + } } +case class ControllerBrokerStateInfo(channel: BlockingChannel, + broker: Broker, + messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + requestSendThread: RequestSendThread) + class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging { this.logIdent = "[Controller " + config.brokerId + "], " - info("startup"); private var isRunning = true private val controllerLock = new Object private var controllerChannelManager: ControllerChannelManager = null - private var allBrokers : Set[Broker] = null - private var allBrokerIds : Set[Int] = null + private var liveBrokers : Set[Broker] = null + private var liveBrokerIds : Set[Int] = null private var allTopics: Set[String] = null private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null private var allLeaders: mutable.Map[(String, Int), Int] = null - // Return true if this controller succeeds in the controller competition + // Return true if this controller succeeds in the controller leader election private def tryToBecomeController(): Boolean = { - try { - ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString) - // Only the broker successfully registering as the controller can execute following code, otherwise - // some exception will be thrown. - registerBrokerChangeListener() - registerTopicChangeListener() - allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet - allBrokerIds = allBrokers.map(_.id) - info("all brokers: %s".format(allBrokerIds)) - allTopics = ZkUtils.getAllTopics(zkClient).toSet - info("all topics: %s".format(allTopics)) - allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator) - info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment)) - allLeaders = new mutable.HashMap[(String, Int), Int] - controllerChannelManager = new ControllerChannelManager(allBrokers, config) - controllerChannelManager.startup() - return true - } catch { - case e: ZkNodeExistsException => - registerControllerExistListener() - info("broker didn't succeed registering as the controller since it's taken by someone else") - return false - case e2 => throw e2 - } + val controllerStatus = + try { + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString) + // Only the broker elected as the new controller can execute following code, otherwise + // some exception will be thrown. + registerBrokerChangeListener() + registerTopicChangeListener() + liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet + liveBrokerIds = liveBrokers.map(_.id) + info("Currently active brokers in the cluster: %s".format(liveBrokerIds)) + allTopics = ZkUtils.getAllTopics(zkClient).toSet + info("Current list of topics in the cluster: %s".format(allTopics)) + allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator) + info("Partition replica assignment: %s".format(allPartitionReplicaAssignment)) + allLeaders = new mutable.HashMap[(String, Int), Int] + controllerChannelManager = new ControllerChannelManager(liveBrokers, config) + controllerChannelManager.startup() + true + } catch { + case e: ZkNodeExistsException => + registerControllerExistsListener() + false + case e2 => throw e2 + } + controllerStatus } - private def controllerRegisterOrFailover(){ - if(!isRunning){ - info("controller has already been shut down, don't need to compete for lead controller any more") - return - } - info("try to become controller") - if(tryToBecomeController() == true){ - info("won the controller competition and work on leader and isr recovery") - deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics) - debug("work on broker changes") - onBrokerChange() - - // If there are some partition with leader not initialized, init the leader for them - val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1)) - debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders)) - initLeaders(partitionReplicaAssignment) - } + private def controllerRegisterOrFailover() { + if(isRunning) { + if(tryToBecomeController()) { + readAndSendLeaderAndIsrFromZookeeper(liveBrokerIds, allTopics) + onBrokerChange() + // If there are some partition with leader not initialized, init the leader for them + val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1)) + debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders)) + initLeaders(partitionReplicaAssignment) + } + }else + info("Controller has been shut down, aborting startup procedure") } def isActive(): Boolean = { @@ -209,20 +204,22 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging def startup() = { controllerLock synchronized { + info("Controller starting up"); registerSessionExpirationListener() - registerControllerExistListener() + registerControllerExistsListener() isRunning = true controllerRegisterOrFailover() + info("Controller startup complete") } } def shutdown() = { controllerLock synchronized { - if(controllerChannelManager != null){ - info("shut down") + if(controllerChannelManager != null) { + info("Controller shutting down") controllerChannelManager.shutdown() controllerChannelManager = null - info("shutted down completely") + info("Controller shutdown complete") } isRunning = false } @@ -244,8 +241,8 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging zkClient.subscribeStateChanges(new SessionExpireListener()) } - private def registerControllerExistListener(){ - zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener()) + private def registerControllerExistsListener(){ + zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistsListener()) } class SessionExpireListener() extends IZkStateListener with Logging { @@ -265,199 +262,265 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging @throws(classOf[Exception]) def handleNewSession() { controllerLock synchronized { - if(controllerChannelManager != null){ + if(controllerChannelManager != null) { info("session expires, clean up the state") controllerChannelManager.shutdown() controllerChannelManager = null } + controllerRegisterOrFailover() } - controllerRegisterOrFailover() } } /** - * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up + * @param brokerIds The set of currently active brokers in the cluster, as known to the controller + * @param topics The set of topics known to the controller by reading from zookeeper + * This API reads the list of partitions that exist for all the topics in the specified list of input topics. + * For each of those partitions, it reads the assigned replica list so that it can send the appropriate leader and + * isr state change request to all the brokers in the assigned replica list. It arranges the leader and isr state + * change requests by broker id. At the end, it circles through this map, sending the required INIT state change requests + * to each broker. This API is called when - + * 1. A new broker starts up + * 2. A new controller is elected */ - private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = { - val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator) - val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]] - for((topicPartition, leaderAndISR) <- leaderAndISRInfos){ - // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it - if(allBrokerIds.contains(leaderAndISR.leader)){ - val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition) - if(brokersAssignedToThisPartitionOpt == None){ - warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment)) - } else{ - val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_)) - relatedBrokersAssignedToThisPartition.foreach(b => { - if(!brokerToLeaderAndISRInfosMap.contains(b)) - brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR]) - brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR) - }) - allLeaders.put(topicPartition, leaderAndISR.leader) - } - } else - debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader)) + private def readAndSendLeaderAndIsrFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = { + val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topics.iterator) + val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] + for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) { + // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it + liveBrokerIds.contains(leaderAndIsr.leader) match { + case true => + val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition) + brokersAssignedToThisPartitionOpt match { + case Some(brokersAssignedToThisPartition) => + val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_)) + relatedBrokersAssignedToThisPartition.foreach(b => { + brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr]) + brokerToLeaderAndIsrInfoMap(b).put(topicPartition, leaderAndIsr) + }) + allLeaders.put(topicPartition, leaderAndIsr.leader) + case None => warn(("While refreshing controller's leader and isr cache, no replica assignment was found " + + "for partition [%s, %d]. Rest of the partition replica assignment is %s").format(topicPartition._1, + topicPartition._2, allPartitionReplicaAssignment)) + } + case false => + debug("While refreshing controller's leader and isr cache, broker %d is not alive any more, just ignore it" + .format(leaderAndIsr.leader)) + } } - info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString())) + debug(("While refreshing controller's leader and isr cache, the state change requests for each broker is " + + "[%s]").format(brokerToLeaderAndIsrInfoMap.toString())) - brokerToLeaderAndISRInfosMap.foreach(m =>{ + brokerToLeaderAndIsrInfoMap.foreach(m =>{ val broker = m._1 - val leaderAndISRs = m._2 - val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs) - info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString)) - sendRequest(broker, leaderAndISRRequest) + val leaderAndIsrs = m._2 + val leaderAndIsrRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.IsInit, leaderAndIsrs) + info("After refreshing controller's leader and isr cache, the leader and ISR change state change request sent to" + + " new broker [%s] is [%s]".format(broker, leaderAndIsrRequest.toString)) + sendRequest(broker, leaderAndIsrRequest) }) - - info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders)) + info("After refreshing controller's leader and isr cache for brokers %s, the leaders assignment is %s" + .format(brokerIds, allLeaders)) } - private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) { - val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]] + val brokerToLeaderAndISRInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndIsr]] for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) { - val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r)) + val liveAssignedReplicas = replicaAssignment.filter(r => liveBrokerIds.contains(r)) debug("for topic [%s], partition [%d], live assigned replicas are: [%s]" - .format(topicPartition._1, - topicPartition._2, - liveAssignedReplicas)) - if(!liveAssignedReplicas.isEmpty){ + .format(topicPartition._1, + topicPartition._2, + liveAssignedReplicas)) + if(!liveAssignedReplicas.isEmpty) { debug("live assigned replica is not empty, check zkClient: %s".format(zkClient)) val leader = liveAssignedReplicas.head - var leaderAndISR: LeaderAndISR = null + var leaderAndISR: LeaderAndIsr = null var updateLeaderISRZKPathSucceeded: Boolean = false - while(!updateLeaderISRZKPathSucceeded){ - val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2) + while(!updateLeaderISRZKPathSucceeded) { + val curLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topicPartition._1, topicPartition._2) debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient)) if(curLeaderAndISROpt == None){ debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2)) - leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList) - ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString) + leaderAndISR = new LeaderAndIsr(leader, liveAssignedReplicas.toList) + ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2), leaderAndISR.toString) updateLeaderISRZKPathSucceeded = true - } else{ - debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2)) + } else { + debug("During initializing leader of parition (%s, %d),".format(topicPartition._1, topicPartition._2) + + " the current leader and isr in zookeeper is not empty") val curZkPathVersion = curLeaderAndISROpt.get.zkVersion - leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList, curLeaderAndISROpt.get.zkVersion + 1) - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion) - if(updateSucceeded){ + leaderAndISR = new LeaderAndIsr(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList, + curLeaderAndISROpt.get.zkVersion + 1) + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2), + leaderAndISR.toString, curZkPathVersion) + if(updateSucceeded) { leaderAndISR.zkVersion = newVersion } updateLeaderISRZKPathSucceeded = updateSucceeded } } liveAssignedReplicas.foreach(b => { - if(!brokerToLeaderAndISRInfosMap.contains(b)) - brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR]) - brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR) + if(!brokerToLeaderAndISRInfoMap.contains(b)) + brokerToLeaderAndISRInfoMap.put(b, new mutable.HashMap[(String, Int), LeaderAndIsr]) + brokerToLeaderAndISRInfoMap(b).put(topicPartition, leaderAndISR) } ) allLeaders.put(topicPartition, leaderAndISR.leader) } else{ - warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), allBrokerIds)) + warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds)) } } - info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap)) - brokerToLeaderAndISRInfosMap.foreach(m =>{ + info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfoMap)) + brokerToLeaderAndISRInfoMap.foreach(m =>{ val broker = m._1 val leaderAndISRs = m._2 - val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs) + val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRs) info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest)) sendRequest(broker, leaderAndISRRequest) }) } - - private def onBrokerChange(newBrokers: Set[Int] = null){ + /** + * @param newBrokers The list of brokers that are started up. This is an optional argument that can be empty when + * new controller is being elected + * The purpose of this API is to send the leader state change request to all live replicas of partitions that + * currently don't have an alive leader. It first finds the partitions with dead leaders, then it looks up the list + * of assigned replicas for those partitions that are alive. It reads the leader and isr info for those partitions + * from zookeeper. + * It can happen that when the controller is in the middle of updating the new leader info in zookeeper, + * the leader changes the ISR for the partition. Due to this, the zookeeper path's version will be different than + * what was known to the controller. So it's new leader update will fail. The controller retries the leader election + * based on the new ISR until it's leader update in zookeeper succeeds. + * Once the write to zookeeper succeeds, it sends the leader state change request to the live assigned replicas for + * each affected partition. + */ + private def onBrokerChange(newBrokers: Set[Int] = Set.empty[Int]) { /** handle the new brokers, send request for them to initialize the local log **/ - if(newBrokers != null && newBrokers.size != 0) - deliverLeaderAndISRFromZookeeper(newBrokers, allTopics) + if(newBrokers.size != 0) + readAndSendLeaderAndIsrFromZookeeper(newBrokers, allTopics) /** handle leader election for the partitions whose leader is no longer alive **/ - val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]] - allLeaders.foreach(m =>{ - val topicPartition = m._1 - val leader = m._2 - // We only care about the partitions, whose leader is no longer alive - if(!allBrokerIds.contains(leader)){ - var updateLeaderISRZKPathSucceeded: Boolean = false - while(!updateLeaderISRZKPathSucceeded){ - val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition) - if(assignedReplicasOpt == None) - throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment)) - val assignedReplicas = assignedReplicasOpt.get - val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r)) - val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2) - if(curLeaderAndISROpt == None){ - throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2)) - } - val curLeaderAndISR = curLeaderAndISROpt.get - val leader = curLeaderAndISR.leader - var newLeader: Int = -1 - val leaderEpoch = curLeaderAndISR.leaderEpoch - val ISR = curLeaderAndISR.ISR - val curZkPathVersion = curLeaderAndISR.zkVersion - debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion)) - // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader - var leaderAndISR: LeaderAndISR = null - // The ISR contains at least 1 broker in the live broker list - val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r)) - if(!liveBrokersInISR.isEmpty){ - newLeader = liveBrokersInISR.head - leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1) - debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString())) - } else{ - debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition)) - if (!liveAssignedReplicasToThisPartition.isEmpty){ - newLeader = liveAssignedReplicasToThisPartition.head - leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1) - warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader)) - } else - error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas)) - } - debug("the leader and ISR converted string: [%s]".format(leaderAndISR)) - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion) - if(updateSucceeded){ - leaderAndISR.zkVersion = newVersion - liveAssignedReplicasToThisPartition.foreach(b => { - if(!brokerToLeaderAndISRInfosMap.contains(b)) - brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR]) - brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR) - }) - allLeaders.put(topicPartition, newLeader) - info("on broker changes, allLeader is updated to %s".format(allLeaders)) - } - updateLeaderISRZKPathSucceeded = updateSucceeded + val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] + // retain only partitions whose leaders are not alive + val partitionsWithDeadLeaders = allLeaders.filter(partitionAndLeader => !liveBrokerIds.contains(partitionAndLeader._2)) + partitionsWithDeadLeaders.foreach { partitionAndLeader => + val topic = partitionAndLeader._1._1 + val partition = partitionAndLeader._1._2 + + try { + allPartitionReplicaAssignment.get((topic, partition)) match { + case Some(assignedReplicas) => + val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r)) + ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { + case Some(currentLeaderAndIsr) => + try { + // elect new leader or throw exception + val newLeaderAndIsr = electLeaderForPartition(topic, partition, currentLeaderAndIsr, assignedReplicas) + // store new leader and isr info in cache + liveAssignedReplicasToThisPartition.foreach { b => + brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr]) + brokerToLeaderAndIsrInfoMap(b).put((topic, partition), newLeaderAndIsr) + } + }catch { + case e => error("Error while electing leader for partition [%s, %d]".format(topic, partition)) + } + case None => throw new KafkaException(("On broker changes, " + + "there's no leaderAndISR information for partition (%s, %d) in zookeeper").format(topic, partition)) + } + case None => throw new KafkaException(("While handling broker changes, the " + + "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s") + .format(topic, partition, allPartitionReplicaAssignment)) } + }catch { + case e: PartitionOfflineException => + error("All replicas for partition [%s, %d] are dead.".format(topic, partition) + + " Marking this partition offline") } - }) - brokerToLeaderAndISRInfosMap.foreach(m => { + } + debug("After leader election, leader cache is updated to %s".format(allLeaders)) + brokerToLeaderAndIsrInfoMap.foreach(m => { val broker = m._1 - val leaderAndISRInfos = m._2 - val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos) + val leaderAndISRInfo = m._2 + val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRInfo) sendRequest(broker, leaderAndISRRequest) - info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(broker, leaderAndISRRequest)) + info("On broker changes, the LeaderAndIsrRequest send to broker [%d] is [%s]".format(broker, leaderAndISRRequest)) }) } + /** + * @param topic The topic of the partition whose leader needs to be elected + * @param partition The partition whose leader needs to be elected + * @param currentLeaderAndIsr The leader and isr information stored for this partition in zookeeper + * @param assignedReplicas The list of replicas assigned to the input partition + * @throws PartitionOfflineException If no replica in the assigned replicas list is alive + * This API selects a new leader for the input partition - + * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader + * 2. Else, it picks some alive broker from the assigned replica list as the new leader + * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException + * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache + * TODO: If a leader cannot be elected for a partition, it should be marked offline and exposed through some metric + */ + private def electLeaderForPartition(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr, + assignedReplicas: Seq[Int]):LeaderAndIsr = { + var zookeeperPathUpdateSucceeded: Boolean = false + var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr + while(!zookeeperPathUpdateSucceeded) { + val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r)) + val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => liveBrokerIds.contains(r)) + val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch + val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion + debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]" + .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr, + currentLeaderIsrZkPathVersion)) + newLeaderAndIsr = liveBrokersInIsr.isEmpty match { + case true => + debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" + .format(liveAssignedReplicasToThisPartition.mkString(","))) + liveAssignedReplicasToThisPartition.isEmpty match { + case true => throw new PartitionOfflineException(("No replica for partition " + + "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas)) + case false => + val newLeader = liveAssignedReplicasToThisPartition.head + warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + + "There's potential data loss") + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + } + case false => + val newLeader = liveBrokersInIsr.head + debug("Some broker in ISR is alive, picking the leader from the ISR: " + newLeader) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) + } + info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) + // update the new leadership decision in zookeeper or retry + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion + zookeeperPathUpdateSucceeded = updateSucceeded + } + // update the leader cache + allLeaders.put((topic, partition), newLeaderAndIsr.leader) + newLeaderAndIsr + } + class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[Controller " + config.brokerId + "], " - def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) { + def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { controllerLock synchronized { - info("broker change listener triggered") - val curChildrenSeq: Seq[String] = javaCurChildren - val curBrokerIdsSeq = curChildrenSeq.map(_.toInt) - val curBrokerIds = curBrokerIdsSeq.toSet - val addedBrokerIds = curBrokerIds -- allBrokerIds - val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq) - val deletedBrokerIds = allBrokerIds -- curBrokerIds - allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet - allBrokerIds = allBrokers.map(_.id) - info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds)) - addedBrokersSeq.foreach(controllerChannelManager.addBroker(_)) + val curBrokerIds = currentBrokerList.map(_.toInt).toSet + val newBrokerIds = curBrokerIds -- liveBrokerIds + val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + val deletedBrokerIds = liveBrokerIds -- curBrokerIds + liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + liveBrokerIds = liveBrokers.map(_.id) + info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s" + .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(","))) + newBrokers.foreach(controllerChannelManager.addBroker(_)) deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_)) - onBrokerChange(addedBrokerIds) + onBrokerChange(newBrokerIds) } } } @@ -465,7 +528,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { // get relevant partitions to this broker val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1)) - trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment)) + debug("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment)) initLeaders(partitionReplicaAssignment) } @@ -479,24 +542,24 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging } allLeaders.remove(topicPartition) info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap)) - ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2)) + ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2)) } for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){ val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica) info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest)) sendRequest(broker, stopReplicaRequest) } - /*TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ + /* TODO: kafka-330 remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ } class TopicChangeListener extends IZkChildListener with Logging { this.logIdent = "[Controller " + config.brokerId + "], " @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + def handleChildChange(parentPath : String, children : java.util.List[String]) { controllerLock synchronized { info("topic/partition change listener fired for path " + parentPath) - val currentChildren = JavaConversions.asBuffer(curChilds).toSet + val currentChildren = JavaConversions.asBuffer(children).toSet val newTopics = currentChildren -- allTopics val deletedTopics = allTopics -- currentChildren val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1)) @@ -512,7 +575,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging } } - class ControllerExistListener extends IZkDataListener with Logging { + class ControllerExistsListener extends IZkDataListener with Logging { this.logIdent = "[Controller " + config.brokerId + "], " @throws(classOf[Exception]) @@ -523,7 +586,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { controllerLock synchronized { - info("the current controller failed, competes to be new controller") + info("Current controller failed, participating in election for a new controller") controllerRegisterOrFailover() } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 33ad9c9fd1d..aaa6cb91c88 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,7 +22,7 @@ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ import kafka.log.LogManager -import kafka.api.{LeaderAndISRRequest, LeaderAndISR} +import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr} import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} object ReplicaManager { @@ -114,7 +114,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndISRRequest): collection.Map[(String, Int), Short] = { + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = { info("Handling leader and isr request %s".format(leaderAndISRRequest)) val responseMap = new collection.mutable.HashMap[(String, Int), Short] @@ -141,7 +141,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient * If IsInit flag is on, this means that the controller wants to treat topics not in the request * as deleted. */ - if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){ + if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){ startHighWaterMarksCheckPointThread val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1) info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) @@ -151,7 +151,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient responseMap } - private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { + private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = { info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId) if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) { @@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId)) } - private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) { + private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) { val leaderBrokerId: Int = leaderAndISR.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(leaderBrokerId, topic, partitionId)) diff --git a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala index 0a4408cee0c..9cff60c1cd6 100644 --- a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala @@ -62,18 +62,17 @@ object UpdateOffsetsInZK { "getOffsetsBefore request") } - val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker)) - if(brokerInfos.size == 0) - throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker)) + ZkUtils.getBrokerInfo(zkClient, broker) match { + case Some(brokerInfo) => + val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) + val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1) + val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - val brokerInfo = brokerInfos.head - val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) - val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1) - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - - println("updating partition " + partition + " with new offset: " + offsets(0)) - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString) - numParts += 1 + println("updating partition " + partition + " with new offset: " + offsets(0)) + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString) + numParts += 1 + case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker)) + } } println("updated the offset for " + numParts + " partitions") } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index fe0a08bc1a1..98fa86e42e9 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -26,7 +26,6 @@ import java.util.zip.CRC32 import javax.management._ import scala.collection._ import scala.collection.mutable -import kafka.message.{NoCompressionCodec, CompressionCodec} import org.I0Itec.zkclient.ZkClient import java.util.{Random, Properties} import joptsimple.{OptionSpec, OptionSet, OptionParser} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f9643d497f0..06d965ac963 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -23,10 +23,10 @@ import org.I0Itec.zkclient.{IZkDataListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ -import kafka.api.LeaderAndISR -import kafka.common.NoEpochForPartitionException +import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} +import kafka.common.{KafkaException, NoEpochForPartitionException} object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -43,15 +43,17 @@ object ZkUtils extends Logging { } def getController(zkClient: ZkClient): Int= { - val controller = readDataMaybeNull(zkClient, ControllerPath)._1 - controller.toInt + readDataMaybeNull(zkClient, ControllerPath)._1 match { + case Some(controller) => controller.toInt + case None => throw new KafkaException("Controller doesn't exist") + } } def getTopicPartitionPath(topic: String, partitionId: Int): String ={ getTopicPartitionsPath(topic) + "/" + partitionId } - def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={ + def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={ getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR" } @@ -65,41 +67,42 @@ object ZkUtils extends Logging { def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted - getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt)) - } - - - def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = { - val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition) - val ret = readDataMaybeNull(zkClient, leaderAndISRPath) - val leaderAndISRStr: String = ret._1 - val stat = ret._2 - if(leaderAndISRStr == null) None - else { - SyncJSON.parseFull(leaderAndISRStr) match { - case Some(m) => - val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt - val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt - val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get - val ISR = Utils.getCSVList(ISRString).map(r => r.toInt) - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition)) - Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion)) - case None => None - } - } + brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } + def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { + val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition) + val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath) + val leaderAndIsrOpt = leaderAndIsrInfo._1 + val stat = leaderAndIsrInfo._2 + leaderAndIsrOpt match { + case Some(leaderAndIsrStr) => + SyncJSON.parseFull(leaderAndIsrStr) match { + case Some(m) => + val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt + val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt + val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + val isr = Utils.getCSVList(isrString).map(r => r.toInt) + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, + isr.toString(), zkPathVersion, topic, partition)) + Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion)) + case None => None + } + case None => None // TODO: Handle if leader and isr info is not available in zookeeper + } + } def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { - val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 - if(leaderAndISR == null) None - else { - SyncJSON.parseFull(leaderAndISR) match { - case Some(m) => - Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) - case None => None - } + val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 + leaderAndIsrOpt match { + case Some(leaderAndIsr) => + SyncJSON.parseFull(leaderAndIsr) match { + case Some(m) => + Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) + case None => None + } + case None => None } } @@ -109,32 +112,32 @@ object ZkUtils extends Logging { * other broker will retry becoming leader with the same new epoch value. */ def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = { - val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 - if(leaderAndISR != null) { - val epoch = SyncJSON.parseFull(leaderAndISR) match { - case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) - case Some(m) => - m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt - } - epoch + val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 + leaderAndIsrOpt match { + case Some(leaderAndIsr) => + SyncJSON.parseFull(leaderAndIsr) match { + case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) + case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt + } + case None => throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty" + .format(topic, partition)) } - else - throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition)) } /** * Gets the in-sync replicas (ISR) for a specific topic and partition */ def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = { - val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 - if(leaderAndISR == null) Seq.empty[Int] - else { - SyncJSON.parseFull(leaderAndISR) match { - case Some(m) => - val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get - Utils.getCSVList(ISRString).map(r => r.toInt) - case None => Seq.empty[Int] - } + val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 + leaderAndIsrOpt match { + case Some(leaderAndIsr) => + SyncJSON.parseFull(leaderAndIsr) match { + case Some(m) => + val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get + Utils.getCSVList(ISRString).map(r => r.toInt) + case None => Seq.empty[Int] + } + case None => Seq.empty[Int] } } @@ -142,19 +145,18 @@ object ZkUtils extends Logging { * Gets the assigned replicas (AR) for a specific topic and partition */ def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = { - val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 - val assignedReplicas = if (jsonPartitionMap == null) { - Seq.empty[Int] - } else { - SyncJSON.parseFull(jsonPartitionMap) match { - case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { + val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => + SyncJSON.parseFull(jsonPartitionMap) match { + case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { + case None => Seq.empty[Int] + case Some(seq) => seq.map(_.toInt) + } case None => Seq.empty[Int] - case Some(seq) => seq.map(_.toInt) } - case None => Seq.empty[Int] - } + case None => Seq.empty[Int] } - assignedReplicas } def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { @@ -347,17 +349,16 @@ object ZkUtils extends Logging { (dataStr, stat) } - def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = { + def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = { val stat: Stat = new Stat() - var dataStr: String = null - try{ - dataStr = client.readData(path, stat) - return (dataStr, stat) - } catch { - case e: ZkNoNodeException => - return (null, stat) - case e2 => throw e2 - } + val dataAndStat = try { + (Some(client.readData(path, stat)), stat) + } catch { + case e: ZkNoNodeException => + (None, stat) + case e2 => throw e2 + } + dataAndStat } def getChildren(client: ZkClient, path: String): Seq[String] = { @@ -396,59 +397,66 @@ object ZkUtils extends Logging { cluster } - def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), Seq[Int]] = { + def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): + mutable.Map[(String, Int), Seq[Int]] = { val ret = new mutable.HashMap[(String, Int), Seq[Int]] - topics.foreach{ topic => - val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 - if (jsonPartitionMap != null) { - SyncJSON.parseFull(jsonPartitionMap) match { - case Some(m) => - val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] - for((partition, replicas) <- replicaMap){ - ret.put((topic, partition.toInt), replicas.map(_.toInt)) - debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) - } - case None => - } + topics.foreach { topic => + val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => + SyncJSON.parseFull(jsonPartitionMap) match { + case Some(m) => + val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] + for((partition, replicas) <- replicaMap){ + ret.put((topic, partition.toInt), replicas.map(_.toInt)) + debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas)) + } + case None => + } + case None => } - } + } ret } - def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = { - val ret = new mutable.HashMap[(String, Int), LeaderAndISR] + def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Iterator[String]): + mutable.Map[(String, Int), LeaderAndIsr] = { + val ret = new mutable.HashMap[(String, Int), LeaderAndIsr] val partitionsForTopics = getPartitionsForTopics(zkClient, topics) - for((topic, partitions) <- partitionsForTopics){ - for(partition <- partitions){ - val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt) - if(leaderAndISROpt.isDefined) - ret.put((topic, partition.toInt), leaderAndISROpt.get) + for((topic, partitions) <- partitionsForTopics) { + for(partition <- partitions) { + ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match { + case Some(leaderAndIsr) => ret.put((topic, partition.toInt), leaderAndIsr) + case None => + } } } ret } - def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = { + def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): + mutable.Map[String, collection.Map[Int, Seq[Int]]] = { val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]() topics.foreach{ topic => - val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 - val partitionMap = if (jsonPartitionMap == null) { - Map[Int, Seq[Int]]() - } else { - SyncJSON.parseFull(jsonPartitionMap) match { - case Some(m) => - val m1 = m.asInstanceOf[Map[String, Seq[String]]] - m1.map(p => (p._1.toInt, p._2.map(_.toInt))) - case None => Map[Int, Seq[Int]]() - } + val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + val partitionMap = jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => + SyncJSON.parseFull(jsonPartitionMap) match { + case Some(m) => + val m1 = m.asInstanceOf[Map[String, Seq[String]]] + m1.map(p => (p._1.toInt, p._2.map(_.toInt))) + case None => Map[Int, Seq[Int]]() + } + case None => Map[Int, Seq[Int]]() } - debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap)) + debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap)) ret += (topic -> partitionMap) - } + } ret } - def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = { + def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): + mutable.Map[(String, Int), Seq[Int]] = { val ret = new mutable.HashMap[(String, Int), Seq[Int]] for((topic, partitionAssignment) <- topicPartitionAssignment){ for((partition, replicaAssignment) <- partitionAssignment){ @@ -468,7 +476,8 @@ object ZkUtils extends Logging { } } - def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = { + def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): + Map[(String, Int), Seq[Int]] = { val ret = new mutable.HashMap[(String, Int), Seq[Int]] val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator) topicsAndPartitions.map @@ -499,7 +508,8 @@ object ZkUtils extends Logging { def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = { val dirs = new ZKGroupDirs(group) val consumersInGroup = getConsumersInGroup(zkClient, group) - val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient)) + val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId, + ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient)) consumersInGroup.zip(topicCountMaps).toMap } @@ -522,7 +532,19 @@ object ZkUtils extends Logging { consumersPerTopicMap } - def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1)) + /** + * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker + * or throws an exception if the broker dies before the query to zookeeper finishes + * @param brokerId The broker id + * @param zkClient The zookeeper client connection + * @returns An optional Broker object encapsulating the broker metadata + */ + def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo)) + case None => None + } + } def getAllTopics(zkClient: ZkClient): Seq[String] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) diff --git a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index 16057dc0967..d0b187e02e0 100644 --- a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -53,12 +53,12 @@ object RpcDataSerializationTestUtils{ private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) - def createTestLeaderAndISRRequest() : LeaderAndISRRequest = { - val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1) - val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2) + def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = { + val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1) + val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2) val map = Map(((topic1, 0), leaderAndISR1), ((topic2, 0), leaderAndISR2)) - new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map) + new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map) } def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { @@ -129,7 +129,7 @@ class RpcDataSerializationTest extends JUnitSuite { var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes()) leaderAndISRRequest.writeTo(buffer) buffer.rewind() - val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(buffer) + val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer) assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest, deserializedLeaderAndISRRequest) diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 7458c52f540..4f61f8469df 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -60,7 +60,7 @@ class RequestPurgatoryTest extends JUnit3Suite { } @Test - def testRequestExpirey() { + def testRequestExpiry() { val expiration = 20L val r1 = new DelayedRequest(Array("test1"), null, expiration) val r2 = new DelayedRequest(Array("test1"), null, 200000L) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bbaa7e87b56..21414fb3efb 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -33,9 +33,7 @@ import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit -import kafka.common.ErrorMapping import kafka.api._ -import collection.mutable.{Map, Set} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} @@ -397,17 +395,17 @@ object TestUtils extends Logging { val partition = leaderForPartition._1 val leader = leaderForPartition._2 try{ - val currentLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition) - var newLeaderAndISR: LeaderAndISR = null + val currentLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + var newLeaderAndISR: LeaderAndIsr = null if(currentLeaderAndISROpt == None) - newLeaderAndISR = new LeaderAndISR(leader, List(leader)) + newLeaderAndISR = new LeaderAndIsr(leader, List(leader)) else{ newLeaderAndISR = currentLeaderAndISROpt.get newLeaderAndISR.leader = leader newLeaderAndISR.leaderEpoch += 1 newLeaderAndISR.zkVersion += 1 } - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath( topic, partition), newLeaderAndISR.toString) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString) } catch { case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) } @@ -426,7 +424,7 @@ object TestUtils extends Logging { leaderLock.lock() try { - zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition), new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt, zkClient)) + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt, zkClient)) leaderExistsOrChanged.await(timeoutMs, TimeUnit.MILLISECONDS) // check if leader is elected val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)