@ -51,31 +51,32 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@@ -51,31 +51,32 @@ class ConsumerFetcherManager(private val consumerIdString: String,
if ( noLeaderPartitionSet . isEmpty )
cond . await ( )
val brokers = getAllBrokersInCluster ( zkClient )
val topicsMetadata = ClientUtils . fetchTopicMetadata ( noLeaderPartitionSet . map ( m => m . topic ) . toSet , brokers ) . topicsMetadata
val leaderForPartitionsMap = new HashMap [ ( String , Int ) , Broker ]
topicsMetadata . foreach (
tmd => {
val topic = tmd . topic
tmd . partitionsMetadata . foreach (
pmd => {
if ( pmd . leader . isDefined ) {
val partition = pmd . partitionId
val leaderBroker = pmd . leader . get
leaderForPartitionsMap . put ( ( topic , partition ) , leaderBroker )
}
try {
trace ( "Partitions without leader %s" . format ( noLeaderPartitionSet ) )
val brokers = getAllBrokersInCluster ( zkClient )
val topicsMetadata = ClientUtils . fetchTopicMetadata ( noLeaderPartitionSet . map ( m => m . topic ) . toSet , brokers ) . topicsMetadata
val leaderForPartitionsMap = new HashMap [ TopicAndPartition , Broker ]
topicsMetadata . foreach (
tmd => {
val topic = tmd . topic
tmd . partitionsMetadata . foreach (
pmd => {
val topicAndPartition = TopicAndPartition ( topic , pmd . partitionId )
if ( pmd . leader . isDefined && noLeaderPartitionSet . contains ( topicAndPartition ) ) {
val leaderBroker = pmd . leader . get
leaderForPartitionsMap . put ( topicAndPartition , leaderBroker )
}
} )
} )
} )
noLeaderPartitionSet . foreach
{
case ( TopicAndPartition ( topic , partitionId ) ) =>
// find the leader for this partition
val leaderBrokerOpt = leaderForPartitionsMap . get ( ( topic , partitionId ) )
if ( leaderBrokerOpt . isDefined ) {
val pti = partitionMap ( TopicAndPartition ( topic , partitionId ) )
addFetcher ( topic , partitionId , pti . getFetchOffset ( ) , leaderBrokerOpt . get )
noLeaderPartitionSet . remove ( TopicAndPartition ( topic , partitionId ) )
}
leaderForPartitionsMap . foreach {
case ( topicAndPartition , leaderBroker ) =>
val pti = partitionMap ( topicAndPartition )
addFetcher ( topicAndPartition . topic , topicAndPartition . partition , pti . getFetchOffset ( ) , leaderBroker )
}
noLeaderPartitionSet --= leaderForPartitionsMap . keySet
} catch {
case t => warn ( "Failed to find leader for %s" . format ( noLeaderPartitionSet ) , t )
}
} finally {
lock . unlock ( )