From eeb817ac23f4aa756ebe404a85eeb618a7cb5a08 Mon Sep 17 00:00:00 2001 From: Swapnil Ghike Date: Tue, 15 Jan 2013 09:45:33 -0800 Subject: [PATCH 1/5] KAFKA-697 ConsoleConsumer throws InvalidConfigException for . in client id; reviewed by Jun Rao, Neha Narkhede, John Fung --- core/src/main/scala/kafka/common/Config.scala | 6 +++--- core/src/main/scala/kafka/common/Topic.scala | 8 +++++--- core/src/test/scala/unit/kafka/common/ConfigTest.scala | 8 ++++---- core/src/test/scala/unit/kafka/common/TopicTest.scala | 4 ++-- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala index 53bfcfbc6f6..d24fb0df635 100644 --- a/core/src/main/scala/kafka/common/Config.scala +++ b/core/src/main/scala/kafka/common/Config.scala @@ -23,14 +23,14 @@ import kafka.utils.Logging trait Config extends Logging { def validateChars(prop: String, value: String) { - val legalChars = "[a-zA-Z0-9_-]" + val legalChars = "[a-zA-Z0-9\\._\\-]" val rgx = new Regex(legalChars + "*") rgx.findFirstIn(value) match { case Some(t) => if (!t.equals(value)) - throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -") - case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") + case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") } } } diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index c96ed62bd8b..5bd8f6bced2 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -20,21 +20,23 @@ package kafka.common import util.matching.Regex object Topic { - private val legalChars = "[a-zA-Z0-9_-]" + private val legalChars = "[a-zA-Z0-9\\._\\-]" private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") def validate(topic: String) { if (topic.length <= 0) throw new InvalidTopicException("topic name is illegal, can't be empty") + else if (topic.equals(".") || topic.equals("..")) + throw new InvalidTopicException("topic name cannot be \".\" or \"..\"") else if (topic.length > maxNameLength) throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters") rgx.findFirstIn(topic) match { case Some(t) => if (!t.equals(topic)) - throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") - case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, _ and -") + throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") + case None => throw new InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") } } } diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala index 6226ddaa61b..74118f4cbf7 100644 --- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala +++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala @@ -29,7 +29,7 @@ class ConfigTest { @Test def testInvalidClientIds() { val invalidClientIds = new ArrayBuffer[String]() - val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=') + val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=') for (weirdChar <- badChars) { invalidClientIds += "Is" + weirdChar + "illegal" } @@ -45,7 +45,7 @@ class ConfigTest { } val validClientIds = new ArrayBuffer[String]() - validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "") + validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_.", "") for (i <- 0 until validClientIds.size) { try { ProducerConfig.validateClientId(validClientIds(i)) @@ -59,7 +59,7 @@ class ConfigTest { @Test def testInvalidGroupIds() { val invalidGroupIds = new ArrayBuffer[String]() - val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ', '\t', '\r', '\n', '=') + val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=') for (weirdChar <- badChars) { invalidGroupIds += "Is" + weirdChar + "illegal" } @@ -75,7 +75,7 @@ class ConfigTest { } val validGroupIds = new ArrayBuffer[String]() - validGroupIds += ("valid", "GROUP", "iDs", "ar6", "VaL1d", "_0-9_", "") + validGroupIds += ("valid", "GROUP", "iDs", "ar6", "VaL1d", "_0-9_.", "") for (i <- 0 until validGroupIds.size) { try { ConsumerConfig.validateGroupId(validGroupIds(i)) diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala index b37553e80ab..c8f8f4d8715 100644 --- a/core/src/test/scala/unit/kafka/common/TopicTest.scala +++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala @@ -32,7 +32,7 @@ class TopicTest { for (i <- 1 to 6) longName += longName invalidTopicNames += longName - val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.') + val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=') for (weirdChar <- badChars) { invalidTopicNames += "Is" + weirdChar + "illegal" } @@ -48,7 +48,7 @@ class TopicTest { } val validTopicNames = new ArrayBuffer[String]() - validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_") + validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.") for (i <- 0 until validTopicNames.size) { try { Topic.validate(validTopicNames(i)) From de1a4d727693c39be19fd9db427746fa6c8a4a12 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Mon, 14 Jan 2013 09:29:52 -0800 Subject: [PATCH 2/5] KAFKA-698 Avoid advancing the log end offset until the append has actually happened since reads may be happening in the meantime. --- core/src/main/scala/kafka/log/Log.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 79db610ca90..560be19721d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -269,14 +269,14 @@ private[kafka] class Log(val dir: File, // assign offsets to the messageset val offsets = if(assignOffsets) { - val firstOffset = nextOffset.get - validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec) - val lastOffset = nextOffset.get - 1 + val offsetCounter = new AtomicLong(nextOffset.get) + val firstOffset = offsetCounter.get + validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec) + val lastOffset = offsetCounter.get - 1 (firstOffset, lastOffset) } else { if(!messageSetInfo.offsetsMonotonic) throw new IllegalArgumentException("Out of order offsets found in " + messages) - nextOffset.set(messageSetInfo.lastOffset + 1) (messageSetInfo.firstOffset, messageSetInfo.lastOffset) } @@ -285,6 +285,9 @@ private[kafka] class Log(val dir: File, .format(this.name, offsets._1, nextOffset.get(), validMessages)) segment.append(offsets._1, validMessages) + // advance the log end offset + nextOffset.set(offsets._2 + 1) + // return the offset at which the messages were appended offsets } From 777f66220153a64cd33cd5484a64de556f4fa3a8 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 15 Jan 2013 21:26:45 -0800 Subject: [PATCH 3/5] Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede --- .../main/scala/kafka/api/FetchRequest.scala | 36 ++++++++++++++----- .../scala/kafka/javaapi/FetchRequest.scala | 6 ++-- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b4fb87451a9..79687471b15 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -58,13 +58,13 @@ object FetchRequest { } } -case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, - correlationId: Int = FetchRequest.DefaultCorrelationId, - clientId: String = ConsumerConfig.DefaultClientId, - replicaId: Int = Request.OrdinaryConsumerId, - maxWait: Int = FetchRequest.DefaultMaxWait, - minBytes: Int = FetchRequest.DefaultMinBytes, - requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) +case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, + correlationId: Int = FetchRequest.DefaultCorrelationId, + clientId: String = ConsumerConfig.DefaultClientId, + replicaId: Int = Request.OrdinaryConsumerId, + maxWait: Int = FetchRequest.DefaultMaxWait, + minBytes: Int = FetchRequest.DefaultMinBytes, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { /** @@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, */ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + /** + * Public constructor for the clients + */ + def this(correlationId: Int, + clientId: String, + maxWait: Int, + minBytes: Int, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) { + this(versionId = FetchRequest.CurrentVersion, + correlationId = correlationId, + clientId = clientId, + replicaId = Request.OrdinaryConsumerId, + maxWait = maxWait, + minBytes= minBytes, + requestInfo = requestInfo) + } + def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) @@ -144,7 +161,10 @@ class FetchRequestBuilder() { this } - def replicaId(replicaId: Int): FetchRequestBuilder = { + /** + * Only for internal use. Clients shouldn't set replicaId. + */ + private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = { this.replicaId = replicaId this } diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index 44d148e7d89..b4752404895 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -18,14 +18,12 @@ package kafka.javaapi import scala.collection.JavaConversions -import kafka.api.PartitionFetchInfo import java.nio.ByteBuffer import kafka.common.TopicAndPartition - +import kafka.api.{Request, PartitionFetchInfo} class FetchRequest(correlationId: Int, clientId: String, - replicaId: Int, maxWait: Int, minBytes: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { @@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int, kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, - replicaId = replicaId, + replicaId = Request.OrdinaryConsumerId, maxWait = maxWait, minBytes = minBytes, requestInfo = scalaMap From da7f14676e677bb6b4fc1ea75fe9792709954591 Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Wed, 16 Jan 2013 10:00:22 -0800 Subject: [PATCH 4/5] KAFKA-702 Deadlock between request handler/processor threads; reviewed by Neha Narkhede, Jun Rao --- core/src/main/scala/kafka/network/RequestChannel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 0e5b7cb8d00..848c877ad34 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -98,7 +98,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) - responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize) + responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() newGauge( "RequestQueueSize", From 214a0af46b98aa9eaf54d0fbe982bc8ba2ae0a74 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Thu, 17 Jan 2013 09:36:01 -0800 Subject: [PATCH 5/5] Consumer rebalance fails if no leader available for a partition and stops all fetchers; patched by Maxime Brugidou; reviewed by Jun Rao; kafka-693 --- .../consumer/ConsumerFetcherManager.scala | 8 +++-- .../consumer/ConsumerFetcherThread.scala | 7 ++++- .../kafka/consumer/PartitionTopicInfo.scala | 9 ++++-- .../consumer/ZookeeperConsumerConnector.scala | 31 +++---------------- .../kafka/server/AbstractFetcherThread.scala | 21 +++++++++---- .../kafka/server/ReplicaFetcherThread.scala | 9 ++++-- .../kafka/consumer/ConsumerIteratorTest.scala | 1 - .../ZookeeperConsumerConnectorTest.scala | 1 - .../unit/kafka/integration/FetcherTest.scala | 1 - 9 files changed, 44 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index a6cbfb6df06..69c6b3ee185 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderForPartitionsMap.foreach{ case(topicAndPartition, leaderBroker) => val pti = partitionMap(topicAndPartition) - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + try { + addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + noLeaderPartitionSet -= topicAndPartition + } catch { + case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t) + } } - noLeaderPartitionSet --= leaderForPartitionsMap.keySet shutdownIdleFetcherThreads() } catch { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 713c7c99097..1135f5d77bc 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData} import kafka.common.TopicAndPartition +import kafka.common.ErrorMapping class ConsumerFetcherThread(name: String, @@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String, case _ => startTimestamp = OffsetRequest.LatestTime } val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1))) - val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) + val newOffset = partitionErrorAndOffset.error match { + case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head + case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error) + } val pti = partitionMap(topicAndPartition) pti.resetFetchOffset(newOffset) pti.resetConsumeOffset(newOffset) diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 6003cab4f0f..97922445102 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -23,7 +23,6 @@ import kafka.message._ import kafka.utils.Logging class PartitionTopicInfo(val topic: String, - val brokerId: Int, val partitionId: Int, private val chunkQueue: BlockingQueue[FetchedDataChunk], private val consumedOffset: AtomicLong, @@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String, * Get the next fetch offset after this message set */ private def nextOffset(messages: ByteBufferMessageSet): Long = { - var nextOffset = -1L + var nextOffset = PartitionTopicInfo.InvalidOffset val iter = messages.shallowIterator while(iter.hasNext) nextOffset = iter.next.nextOffset @@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String, override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get + ": consumed offset = " + consumedOffset.get } + +object PartitionTopicInfo { + val InvalidOffset = -1L + + def isOffsetInvalid(offset: Long) = offset < 0L +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 42a962893a8..c1f8513c4a6 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val brokers = getAllBrokersInCluster(zkClient) val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] - val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] topicsMetadata.foreach(m =>{ val topic = m.topic val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) partitionsPerTopicMap.put(topic, partitions) - m.partitionsMetadata.foreach(pmd =>{ - val partitionId = pmd.partitionId - val leaderOpt = pmd.leader - if(leaderOpt.isDefined) - leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id) - }) }) + /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. @@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) - addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId) + addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) // record the partition ownership decision partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) } @@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], - leaderIdForPartitionsMap: Map[(String, Int), Int], topicDirs: ZKGroupTopicDirs, partition: Int, topic: String, consumerThreadId: String) { val partTopicInfoMap = currentTopicRegistry.get(topic) - // find the leader for this partition - val leaderOpt = leaderIdForPartitionsMap.get((topic, partition)) - leaderOpt match { - case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s". - format(partition, topic)) - case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l)) - } - val leader = leaderOpt.get - val znode = topicDirs.consumerOffsetDir + "/" + partition val offsetString = readDataMaybeNull(zkClient, znode)._1 - // If first time starting a consumer, set the initial offset based on the config + // If first time starting a consumer, set the initial offset to -1 val offset = offsetString match { case Some(offsetStr) => offsetStr.toLong - case None => - config.autoOffsetReset match { - case OffsetRequest.SmallestTimeString => - SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId) - case OffsetRequest.LargestTimeString => - SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId) - } + case None => PartitionTopicInfo.InvalidOffset } val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset) val partTopicInfo = new PartitionTopicInfo(topic, - leader, partition, queue, consumedOffset, diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index bdb1d03e3c6..3cba7438e32 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.cluster.Broker -import kafka.consumer.SimpleConsumer import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock +import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} /** @@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) case ErrorMapping.OffsetOutOfRangeCode => - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + } catch { + case e => + warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e) + partitionsWithError += topicAndPartition + } case _ => warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) @@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke def addPartition(topic: String, partitionId: Int, initialOffset: Long) { partitionMapLock.lock() try { - partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset) + val topicPartition = TopicAndPartition(topic, partitionId) + partitionMap.put( + topicPartition, + if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset) partitionMapCond.signalAll() } finally { partitionMapLock.unlock() diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6ae601ed7db..79b3fa38728 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -19,10 +19,9 @@ package kafka.server import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet -import kafka.common.TopicAndPartition +import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} - class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, @@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String, replicaId = brokerConfig.brokerId, requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)) ) - val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) + val offset = partitionErrorAndOffset.error match { + case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head + case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error) + } val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get replica.log.get.truncateAndStartWithNewOffset(offset) offset diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 0b5363f34a8..8ae30ea785d 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, - c.brokerId, 0, queue, new AtomicLong(consumedOffset), diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e12f5a72c43..f7ee914d4da 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(topic, topicRegistry.map(r => r._1).head) val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2))) val brokerPartition = topicsAndPartitionsInRegistry.head._2.head - assertEquals(0, brokerPartition.brokerId) assertEquals(0, brokerPartition.partitionId) // also check partition ownership diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 61d9fc98c24..5a57bd14526 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, - c.brokerId, 0, queue, new AtomicLong(0),