Browse Source

Consumer rebalance fails if no leader available for a partition and stops all fetchers; patched by Maxime Brugidou; reviewed by Jun Rao; kafka-693

0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
214a0af46b
  1. 8
      core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
  2. 7
      core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
  3. 9
      core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
  4. 31
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  5. 21
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  6. 9
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  7. 1
      core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
  8. 1
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  9. 1
      core/src/test/scala/unit/kafka/integration/FetcherTest.scala

8
core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala

@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String,
leaderForPartitionsMap.foreach{
case(topicAndPartition, leaderBroker) =>
val pti = partitionMap(topicAndPartition)
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
noLeaderPartitionSet -= topicAndPartition
} catch {
case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
}
}
noLeaderPartitionSet --= leaderForPartitionsMap.keySet
shutdownIdleFetcherThreads()
} catch {

7
core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala

@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread @@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
import kafka.common.TopicAndPartition
import kafka.common.ErrorMapping
class ConsumerFetcherThread(name: String,
@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String, @@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String,
case _ => startTimestamp = OffsetRequest.LatestTime
}
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val newOffset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)

9
core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

@ -23,7 +23,6 @@ import kafka.message._ @@ -23,7 +23,6 @@ import kafka.message._
import kafka.utils.Logging
class PartitionTopicInfo(val topic: String,
val brokerId: Int,
val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong,
@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String, @@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String,
* Get the next fetch offset after this message set
*/
private def nextOffset(messages: ByteBufferMessageSet): Long = {
var nextOffset = -1L
var nextOffset = PartitionTopicInfo.InvalidOffset
val iter = messages.shallowIterator
while(iter.hasNext)
nextOffset = iter.next.nextOffset
@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String, @@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String,
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
object PartitionTopicInfo {
val InvalidOffset = -1L
def isOffsetInvalid(offset: Long) = offset < 0L
}

31
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{
val topic = m.topic
val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
partitionsPerTopicMap.put(topic, partitions)
m.partitionsMetadata.foreach(pmd =>{
val partitionId = pmd.partitionId
val leaderOpt = pmd.leader
if(leaderOpt.isDefined)
leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
})
})
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId)
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
leaderIdForPartitionsMap: Map[(String, Int), Int],
topicDirs: ZKGroupTopicDirs, partition: Int,
topic: String, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
// find the leader for this partition
val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
leaderOpt match {
case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
format(partition, topic))
case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
}
val leader = leaderOpt.get
val znode = topicDirs.consumerOffsetDir + "/" + partition
val offsetString = readDataMaybeNull(zkClient, znode)._1
// If first time starting a consumer, set the initial offset based on the config
// If first time starting a consumer, set the initial offset to -1
val offset =
offsetString match {
case Some(offsetStr) => offsetStr.toLong
case None =>
config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
case OffsetRequest.LargestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
}
case None => PartitionTopicInfo.InvalidOffset
}
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
leader,
partition,
queue,
consumedOffset,

21
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
/**
@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
try {
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
} catch {
case e =>
warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
case _ =>
warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
ErrorMapping.exceptionFor(partitionData.error))
@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
partitionMapLock.lock()
try {
partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
val topicPartition = TopicAndPartition(topic, partitionId)
partitionMap.put(
topicPartition,
if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset)
partitionMapCond.signalAll()
} finally {
partitionMapLock.unlock()

9
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -19,10 +19,9 @@ package kafka.server @@ -19,10 +19,9 @@ package kafka.server
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
import kafka.common.TopicAndPartition
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
class ReplicaFetcherThread(name:String,
sourceBroker: Broker,
brokerConfig: KafkaConfig,
@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String, @@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String,
replicaId = brokerConfig.brokerId,
requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
)
val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
replica.log.get.truncateAndStartWithNewOffset(offset)
offset

1
core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala

@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { @@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
0,
queue,
new AtomicLong(consumedOffset),

1
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(topic, topicRegistry.map(r => r._1).head)
val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
assertEquals(0, brokerPartition.brokerId)
assertEquals(0, brokerPartition.partitionId)
// also check partition ownership

1
core/src/test/scala/unit/kafka/integration/FetcherTest.scala

@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
0,
queue,
new AtomicLong(0),

Loading…
Cancel
Save