@ -29,6 +29,7 @@ import com.yammer.metrics.core.Gauge
@@ -29,6 +29,7 @@ import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicLong
import kafka.utils. { Pool , ShutdownableThread }
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
/* *
@ -38,17 +39,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -38,17 +39,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
fetchSize : Int , fetcherBrokerId : Int = - 1 , maxWait : Int = 0 , minBytes : Int = 1 )
extends ShutdownableThread ( name ) {
private val fetchMap = new mutable . HashMap [ TopicAndPartition , Long ] // a ( topic , partition ) -> offset map
private val fetchMapLock = new Object
private val partitionMap = new mutable . HashMap [ TopicAndPartition , Long ] // a ( topic , partition ) -> offset map
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock . newCondition ( )
val simpleConsumer = new SimpleConsumer ( sourceBroker . host , sourceBroker . port , socketTimeout , socketBufferSize )
val fetcherMetrics = FetcherStat . getFetcherStat ( name + "-" + sourceBroker . id )
val fetchRequestuilder = new FetchRequestBuilder ( ) .
clientId ( clientId ) .
replicaId ( fetcherBrokerId ) .
maxWait ( maxWait ) .
minBytes ( minBytes )
/* callbacks to be defined in subclass */
// process fetched data
@ -67,12 +63,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -67,12 +63,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
override def doWork ( ) {
fetchMapLock synchronized {
fetchMap . foreach {
val fetchRequestuilder = new FetchRequestBuilder ( ) .
clientId ( clientId ) .
replicaId ( fetcherBrokerId ) .
maxWait ( maxWait ) .
minBytes ( minBytes )
partitionMapLock . lock ( )
try {
while ( partitionMap . isEmpty )
partitionMapCond . await ( )
partitionMap . foreach {
case ( ( topicAndPartition , offset ) ) =>
fetchRequestuilder . addFetch ( topicAndPartition . topic , topicAndPartition . partition ,
offset , fetchSize )
}
} finally {
partitionMapLock . unlock ( )
}
val fetchRequest = fetchRequestuilder . build ( )
@ -85,9 +92,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -85,9 +92,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
case t =>
debug ( "error in fetch %s" . format ( fetchRequest ) , t )
if ( isRunning . get ) {
fetchMapLock synchronized {
partitionsWithError ++= fetchMap . keys
fetchMap . clear ( )
partitionMapLock synchronized {
partitionsWithError ++= partitionMap . keys
}
}
}
@ -95,11 +101,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -95,11 +101,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
if ( response != null ) {
// process fetched data
fetchMapLock synchronized {
partitionMapLock . lock ( )
try {
response . data . foreach {
case ( topicAndPartition , partitionData ) =>
val ( topic , partitionId ) = topicAndPartition . asTuple
val currentOffset = fetch Map. get ( topicAndPartition )
val currentOffset = partition Map. get ( topicAndPartition )
if ( currentOffset . isDefined ) {
partitionData . error match {
case ErrorMapping . NoError =>
@ -109,24 +116,25 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -109,24 +116,25 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
case Some ( m : MessageAndOffset ) => m . nextOffset
case None => currentOffset . get
}
fetch Map. put ( topicAndPartition , newOffset )
partition Map. put ( topicAndPartition , newOffset )
FetcherLagMetrics . getFetcherLagMetrics ( topic , partitionId ) . lag = partitionData . hw - newOffset
fetcherMetrics . byteRate . mark ( validBytes )
// Once we hand off the partition data to the subclass , we can 't mess with it any more in this thread
processPartitionData ( topicAndPartition , currentOffset . get , partitionData )
case ErrorMapping . OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange ( topicAndPartition )
fetch Map. put ( topicAndPartition , newOffset )
partition Map. put ( topicAndPartition , newOffset )
warn ( "current offset %d for topic %s partition %d out of range; reset offset to %d"
. format ( currentOffset . get , topic , partitionId , newOffset ) )
case _ =>
error ( "error for %s %d to broker %d" . format ( topic , partitionId , sourceBroker . id ) ,
warn ( "error for %s %d to broker %d" . format ( topic , partitionId , sourceBroker . id ) ,
ErrorMapping . exceptionFor ( partitionData . error ) )
partitionsWithError += topicAndPartition
fetchMap . remove ( topicAndPartition )
}
}
}
} finally {
partitionMapLock . unlock ( )
}
}
@ -137,26 +145,39 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
@@ -137,26 +145,39 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
def addPartition ( topic : String , partitionId : Int , initialOffset : Long ) {
fetchMapLock synchronized {
fetchMap . put ( TopicAndPartition ( topic , partitionId ) , initialOffset )
partitionMapLock . lock ( )
try {
partitionMap . put ( TopicAndPartition ( topic , partitionId ) , initialOffset )
partitionMapCond . signalAll ( )
} finally {
partitionMapLock . unlock ( )
}
}
def removePartition ( topic : String , partitionId : Int ) {
fetchMapLock synchronized {
fetchMap . remove ( TopicAndPartition ( topic , partitionId ) )
partitionMapLock . lock ( )
try {
partitionMap . remove ( TopicAndPartition ( topic , partitionId ) )
} finally {
partitionMapLock . unlock ( )
}
}
def hasPartition ( topic : String , partitionId : Int ) : Boolean = {
fetchMapLock synchronized {
fetchMap . get ( TopicAndPartition ( topic , partitionId ) ) . isDefined
partitionMapLock . lock ( )
try {
partitionMap . get ( TopicAndPartition ( topic , partitionId ) ) . isDefined
} finally {
partitionMapLock . unlock ( )
}
}
def partitionCount ( ) = {
fetchMapLock synchronized {
fetchMap . size
partitionMapLock . lock ( )
try {
partitionMap . size
} finally {
partitionMapLock . unlock ( )
}
}
}