Browse Source

merge from 0.8 and resolve conflict in Log

pull/2/head
Jun Rao 12 years ago
parent
commit
9ee795ac56
  1. 36
      core/src/main/scala/kafka/api/FetchRequest.scala
  2. 8
      core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
  3. 7
      core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
  4. 9
      core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
  5. 31
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  6. 6
      core/src/main/scala/kafka/javaapi/FetchRequest.scala
  7. 11
      core/src/main/scala/kafka/log/Log.scala
  8. 2
      core/src/main/scala/kafka/network/RequestChannel.scala
  9. 21
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  10. 9
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  11. 1
      core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
  12. 1
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  13. 1
      core/src/test/scala/unit/kafka/integration/FetcherTest.scala

36
core/src/main/scala/kafka/api/FetchRequest.scala

@ -58,13 +58,13 @@ object FetchRequest { @@ -58,13 +58,13 @@ object FetchRequest {
}
}
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
/**
@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
*/
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
/**
* Public constructor for the clients
*/
def this(correlationId: Int,
clientId: String,
maxWait: Int,
minBytes: Int,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
this(versionId = FetchRequest.CurrentVersion,
correlationId = correlationId,
clientId = clientId,
replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait,
minBytes= minBytes,
requestInfo = requestInfo)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
@ -144,7 +161,10 @@ class FetchRequestBuilder() { @@ -144,7 +161,10 @@ class FetchRequestBuilder() {
this
}
def replicaId(replicaId: Int): FetchRequestBuilder = {
/**
* Only for internal use. Clients shouldn't set replicaId.
*/
private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId
this
}

8
core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala

@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String,
leaderForPartitionsMap.foreach{
case(topicAndPartition, leaderBroker) =>
val pti = partitionMap(topicAndPartition)
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
try {
addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
noLeaderPartitionSet -= topicAndPartition
} catch {
case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
}
}
noLeaderPartitionSet --= leaderForPartitionsMap.keySet
shutdownIdleFetcherThreads()
} catch {

7
core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala

@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread @@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
import kafka.common.TopicAndPartition
import kafka.common.ErrorMapping
class ConsumerFetcherThread(name: String,
@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String, @@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String,
case _ => startTimestamp = OffsetRequest.LatestTime
}
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val newOffset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)

9
core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala

@ -23,7 +23,6 @@ import kafka.message._ @@ -23,7 +23,6 @@ import kafka.message._
import kafka.utils.Logging
class PartitionTopicInfo(val topic: String,
val brokerId: Int,
val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong,
@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String, @@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String,
* Get the next fetch offset after this message set
*/
private def nextOffset(messages: ByteBufferMessageSet): Long = {
var nextOffset = -1L
var nextOffset = PartitionTopicInfo.InvalidOffset
val iter = messages.shallowIterator
while(iter.hasNext)
nextOffset = iter.next.nextOffset
@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String, @@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String,
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
object PartitionTopicInfo {
val InvalidOffset = -1L
def isOffsetInvalid(offset: Long) = offset < 0L
}

31
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -405,18 +405,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -405,18 +405,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{
val topic = m.topic
val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
partitionsPerTopicMap.put(topic, partitions)
m.partitionsMetadata.foreach(pmd =>{
val partitionId = pmd.partitionId
val leaderOpt = pmd.leader
if(leaderOpt.isDefined)
leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
})
})
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@ -459,7 +453,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -459,7 +453,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId)
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
@ -576,39 +570,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -576,39 +570,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
leaderIdForPartitionsMap: Map[(String, Int), Int],
topicDirs: ZKGroupTopicDirs, partition: Int,
topic: String, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
// find the leader for this partition
val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
leaderOpt match {
case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
format(partition, topic))
case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
}
val leader = leaderOpt.get
val znode = topicDirs.consumerOffsetDir + "/" + partition
val offsetString = readDataMaybeNull(zkClient, znode)._1
// If first time starting a consumer, set the initial offset based on the config
// If first time starting a consumer, set the initial offset to -1
val offset =
offsetString match {
case Some(offsetStr) => offsetStr.toLong
case None =>
config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
case OffsetRequest.LargestTimeString =>
SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
}
case None => PartitionTopicInfo.InvalidOffset
}
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
leader,
partition,
queue,
consumedOffset,

6
core/src/main/scala/kafka/javaapi/FetchRequest.scala

@ -18,14 +18,12 @@ @@ -18,14 +18,12 @@
package kafka.javaapi
import scala.collection.JavaConversions
import kafka.api.PartitionFetchInfo
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionFetchInfo}
class FetchRequest(correlationId: Int,
clientId: String,
replicaId: Int,
maxWait: Int,
minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int, @@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int,
kafka.api.FetchRequest(
correlationId = correlationId,
clientId = clientId,
replicaId = replicaId,
replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait,
minBytes = minBytes,
requestInfo = scalaMap

11
core/src/main/scala/kafka/log/Log.scala

@ -196,19 +196,20 @@ class Log(val dir: File, @@ -196,19 +196,20 @@ class Log(val dir: File,
if(assignOffsets) {
// assign offsets to the messageset
appendInfo.firstOffset = nextOffset.get
validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec)
appendInfo.lastOffset = nextOffset.get - 1
val offsetCounter = new AtomicLong(nextOffset.get)
validMessages = validMessages.assignOffsets(offsetCounter, appendInfo.codec)
appendInfo.lastOffset = offsetCounter.get - 1
} else {
// we are taking the offsets we are given
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
nextOffset.set(appendInfo.lastOffset + 1)
}
// now append to the log
trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
segment.append(appendInfo.firstOffset, validMessages)
nextOffset.set(appendInfo.lastOffset + 1)
// maybe flush the log and index
maybeFlush(appendInfo.count)

2
core/src/main/scala/kafka/network/RequestChannel.scala

@ -98,7 +98,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe @@ -98,7 +98,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
newGauge(
"RequestQueueSize",

21
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong @@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
/**
@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
try {
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
} catch {
case e =>
warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
case _ =>
warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
ErrorMapping.exceptionFor(partitionData.error))
@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke @@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
partitionMapLock.lock()
try {
partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
val topicPartition = TopicAndPartition(topic, partitionId)
partitionMap.put(
topicPartition,
if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset)
partitionMapCond.signalAll()
} finally {
partitionMapLock.unlock()

9
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -19,10 +19,9 @@ package kafka.server @@ -19,10 +19,9 @@ package kafka.server
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
import kafka.common.TopicAndPartition
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
class ReplicaFetcherThread(name:String,
sourceBroker: Broker,
brokerConfig: KafkaConfig,
@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String, @@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String,
replicaId = brokerConfig.brokerId,
requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
)
val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
replica.log.get.truncateFullyAndStartAt(offset)
offset

1
core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala

@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { @@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
0,
queue,
new AtomicLong(consumedOffset),

1
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar @@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(topic, topicRegistry.map(r => r._1).head)
val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
assertEquals(0, brokerPartition.brokerId)
assertEquals(0, brokerPartition.partitionId)
// also check partition ownership

1
core/src/test/scala/unit/kafka/integration/FetcherTest.scala

@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
0,
queue,
new AtomicLong(0),

Loading…
Cancel
Save