diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index d94b9ace820..bdc020b4b1e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -75,6 +75,8 @@ class ConsumerFetcherManager(private val consumerIdString: String, addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) } noLeaderPartitionSet --= leaderForPartitionsMap.keySet + + shutdownIdleFetcherThreads() } catch { case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) } @@ -124,6 +126,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.lock() try { if (partitionMap != null) { + partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition)) noLeaderPartitionSet ++= partitionList cond.signalAll() } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 66728e3ced1..e8702e2f747 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -23,7 +23,7 @@ import kafka.cluster.Broker abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging { // map of (source brokerid, fetcher Id per source broker) => fetcher - private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread] + private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " @@ -37,17 +37,17 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { mapLock synchronized { var fetcherThread: AbstractFetcherThread = null - val key = (sourceBroker, getFetcherId(topic, partitionId)) + val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId)) fetcherThreadMap.get(key) match { case Some(f) => fetcherThread = f case None => - fetcherThread = createFetcherThread(key._2, sourceBroker) + fetcherThread = createFetcherThread(key.fetcherId, sourceBroker) fetcherThreadMap.put(key, fetcherThread) fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d" - .format(topic, partitionId, initialOffset, sourceBroker.id, key._2)) + .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } @@ -56,11 +56,20 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { fetcher.removePartition(topic, partitionId) + } + } + } + + def shutdownIdleFetcherThreads() { + mapLock synchronized { + val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId] + for ((key, fetcher) <- fetcherThreadMap) { if (fetcher.partitionCount <= 0) { fetcher.shutdown() - fetcherThreadMap.remove(key) + keysToBeRemoved += key } } + fetcherThreadMap --= keysToBeRemoved } } @@ -73,3 +82,5 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I } } } + +case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d1b15d34f8e..c2cc3cbdf39 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -29,6 +29,7 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock /** @@ -38,17 +39,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { - private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map - private val fetchMapLock = new Object + private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map + private val partitionMapLock = new ReentrantLock + private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) - val fetchRequestuilder = new FetchRequestBuilder(). - clientId(clientId). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes) - /* callbacks to be defined in subclass */ // process fetched data @@ -67,12 +63,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - fetchMapLock synchronized { - fetchMap.foreach { + val fetchRequestuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) + + partitionMapLock.lock() + try { + while (partitionMap.isEmpty) + partitionMapCond.await() + partitionMap.foreach { case((topicAndPartition, offset)) => fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } + } finally { + partitionMapLock.unlock() } val fetchRequest = fetchRequestuilder.build() @@ -85,9 +92,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke case t => debug("error in fetch %s".format(fetchRequest), t) if (isRunning.get) { - fetchMapLock synchronized { - partitionsWithError ++= fetchMap.keys - fetchMap.clear() + partitionMapLock synchronized { + partitionsWithError ++= partitionMap.keys } } } @@ -95,11 +101,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (response != null) { // process fetched data - fetchMapLock synchronized { + partitionMapLock.lock() + try { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = fetchMap.get(topicAndPartition) + val currentOffset = partitionMap.get(topicAndPartition) if (currentOffset.isDefined) { partitionData.error match { case ErrorMapping.NoError => @@ -109,24 +116,25 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get } - fetchMap.put(topicAndPartition, newOffset) + partitionMap.put(topicAndPartition, newOffset) FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset fetcherMetrics.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) case ErrorMapping.OffsetOutOfRangeCode => val newOffset = handleOffsetOutOfRange(topicAndPartition) - fetchMap.put(topicAndPartition, newOffset) + partitionMap.put(topicAndPartition, newOffset) warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) case _ => - error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition - fetchMap.remove(topicAndPartition) } } } + } finally { + partitionMapLock.unlock() } } @@ -137,26 +145,39 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } def addPartition(topic: String, partitionId: Int, initialOffset: Long) { - fetchMapLock synchronized { - fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset) + partitionMapLock.lock() + try { + partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset) + partitionMapCond.signalAll() + } finally { + partitionMapLock.unlock() } } def removePartition(topic: String, partitionId: Int) { - fetchMapLock synchronized { - fetchMap.remove(TopicAndPartition(topic, partitionId)) + partitionMapLock.lock() + try { + partitionMap.remove(TopicAndPartition(topic, partitionId)) + } finally { + partitionMapLock.unlock() } } def hasPartition(topic: String, partitionId: Int): Boolean = { - fetchMapLock synchronized { - fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined + partitionMapLock.lock() + try { + partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined + } finally { + partitionMapLock.unlock() } } def partitionCount() = { - fetchMapLock synchronized { - fetchMap.size + partitionMapLock.lock() + try { + partitionMap.size + } finally { + partitionMapLock.unlock() } } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cae08b9caf0..a14e0a2f49b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -146,6 +146,8 @@ class KafkaApis(val requestChannel: RequestChannel, val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + + replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7bb723ea617..42068cad3a2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -206,6 +206,8 @@ class ReplicaManager(val config: KafkaConfig, } responseMap.put(topicAndPartition, errorCode) } + info("Completed leader and isr request %s".format(leaderAndISRRequest)) + replicaFetcherManager.shutdownIdleFetcherThreads() (responseMap, ErrorMapping.NoError) } }