@ -17,7 +17,6 @@
@@ -17,7 +17,6 @@
package kafka.admin
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.server. { ConfigEntityName , ConfigType , DynamicConfig }
import kafka.utils._
@ -26,15 +25,10 @@ import java.util.Random
@@ -26,15 +25,10 @@ import java.util.Random
import java.util.Properties
import kafka.common.TopicAlreadyMarkedForDeletionException
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors. { InvalidPartitionsException , InvalidReplicaAssignmentException , InvalidReplicationFactorException , InvalidTopicException , LeaderNotAvailableException , ReplicaNotAvailableException , TopicExistsException , UnknownTopicOrPartitionException }
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol. { Errors , SecurityProtocol }
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.common.errors. { InvalidPartitionsException , InvalidReplicaAssignmentException , InvalidReplicationFactorException , InvalidTopicException , TopicExistsException , UnknownTopicOrPartitionException }
import scala.collection._
import scala.collection.JavaConverters._
import mutable.ListBuffer
import scala.collection.mutable
import collection.Map
import collection.Set
@ -667,98 +661,9 @@ object AdminUtils extends Logging with AdminUtilities {
@@ -667,98 +661,9 @@ object AdminUtils extends Logging with AdminUtilities {
. map ( entityPath => ( entityPath , fetchEntityConfig ( zkUtils , rootEntityType , entityPath ) ) ) . toMap
}
def fetchTopicMetadataFromZk ( topic : String , zkUtils : ZkUtils ) : MetadataResponse . TopicMetadata =
fetchTopicMetadataFromZk ( topic , zkUtils , mutable . Map . empty [ Int , Broker ] ,
ListenerName . forSecurityProtocol ( SecurityProtocol . PLAINTEXT ) )
def fetchTopicMetadataFromZk ( topics : Set [ String ] , zkUtils : ZkUtils ) : Set [ MetadataResponse . TopicMetadata ] =
fetchTopicMetadataFromZk ( topics , zkUtils , ListenerName . forSecurityProtocol ( SecurityProtocol . PLAINTEXT ) )
def fetchTopicMetadataFromZk ( topics : Set [ String ] , zkUtils : ZkUtils , securityProtocol : SecurityProtocol ) : Set [ MetadataResponse . TopicMetadata ] =
fetchTopicMetadataFromZk ( topics , zkUtils , ListenerName . forSecurityProtocol ( SecurityProtocol . PLAINTEXT ) )
def fetchTopicMetadataFromZk ( topics : Set [ String ] , zkUtils : ZkUtils , listenerName : ListenerName ) : Set [ MetadataResponse . TopicMetadata ] = {
val cachedBrokerInfo = mutable . Map . empty [ Int , Broker ]
topics . map ( topic => fetchTopicMetadataFromZk ( topic , zkUtils , cachedBrokerInfo , listenerName ) )
}
private def fetchTopicMetadataFromZk ( topic : String ,
zkUtils : ZkUtils ,
cachedBrokerInfo : mutable.Map [ Int , Broker ] ,
listenerName : ListenerName ) : MetadataResponse . TopicMetadata = {
if ( zkUtils . pathExists ( getTopicPath ( topic ) ) ) {
val topicPartitionAssignment = zkUtils . getPartitionAssignmentForTopics ( List ( topic ) ) ( topic )
val sortedPartitions = topicPartitionAssignment . toList . sortWith ( ( m1 , m2 ) => m1 . _1 < m2 . _1 )
val partitionMetadata = sortedPartitions . map { partitionMap =>
val partition = partitionMap . _1
val replicas = partitionMap . _2
val inSyncReplicas = zkUtils . getInSyncReplicasForPartition ( topic , partition )
val leader = zkUtils . getLeaderForPartition ( topic , partition )
debug ( "replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader )
var leaderInfo : Node = Node . noNode ( )
var replicaInfo : Seq [ Node ] = Nil
var isrInfo : Seq [ Node ] = Nil
try {
leaderInfo = leader match {
case Some ( l ) =>
try {
getBrokerInfoFromCache ( zkUtils , cachedBrokerInfo , List ( l ) ) . head . getNode ( listenerName )
} catch {
case e : Throwable => throw new LeaderNotAvailableException ( "Leader not available for partition [%s,%d]" . format ( topic , partition ) , e )
}
case None => throw new LeaderNotAvailableException ( "No leader exists for partition " + partition )
}
try {
replicaInfo = getBrokerInfoFromCache ( zkUtils , cachedBrokerInfo , replicas ) . map ( _ . getNode ( listenerName ) )
isrInfo = getBrokerInfoFromCache ( zkUtils , cachedBrokerInfo , inSyncReplicas ) . map ( _ . getNode ( listenerName ) )
} catch {
case e : Throwable => throw new ReplicaNotAvailableException ( e )
}
if ( replicaInfo . size < replicas . size )
throw new ReplicaNotAvailableException ( "Replica information not available for following brokers: " +
replicas . filterNot ( replicaInfo . map ( _ . id ) . contains ( _ ) ) . mkString ( "," ) )
if ( isrInfo . size < inSyncReplicas . size )
throw new ReplicaNotAvailableException ( "In Sync Replica information not available for following brokers: " +
inSyncReplicas . filterNot ( isrInfo . map ( _ . id ) . contains ( _ ) ) . mkString ( "," ) )
new MetadataResponse . PartitionMetadata ( Errors . NONE , partition , leaderInfo , replicaInfo . asJava , isrInfo . asJava )
} catch {
case e : Throwable =>
debug ( "Error while fetching metadata for partition [%s,%d]" . format ( topic , partition ) , e )
new MetadataResponse . PartitionMetadata ( Errors . forException ( e ) , partition , leaderInfo , replicaInfo . asJava , isrInfo . asJava )
}
}
new MetadataResponse . TopicMetadata ( Errors . NONE , topic , Topic . isInternal ( topic ) , partitionMetadata . asJava )
} else {
// topic doesn 't exist , send appropriate error code
new MetadataResponse . TopicMetadata ( Errors . UNKNOWN_TOPIC_OR_PARTITION , topic , Topic . isInternal ( topic ) , java . util . Collections . emptyList ( ) )
}
}
private def getBrokerInfoFromCache ( zkUtils : ZkUtils ,
cachedBrokerInfo : scala.collection.mutable.Map [ Int , Broker ] ,
brokerIds : Seq [ Int ] ) : Seq [ Broker ] = {
var failedBrokerIds : ListBuffer [ Int ] = new ListBuffer ( )
val brokerMetadata = brokerIds . map { id =>
val optionalBrokerInfo = cachedBrokerInfo . get ( id )
optionalBrokerInfo match {
case Some ( brokerInfo ) => Some ( brokerInfo ) // return broker info from the cache
case None => // fetch it from zookeeper
zkUtils . getBrokerInfo ( id ) match {
case Some ( brokerInfo ) =>
cachedBrokerInfo += ( id -> brokerInfo )
Some ( brokerInfo )
case None =>
failedBrokerIds += id
None
}
}
}
brokerMetadata . filter ( _ . isDefined ) . map ( _ . get )
}
private def replicaIndex ( firstReplicaIndex : Int , secondReplicaShift : Int , replicaIndex : Int , nBrokers : Int ) : Int = {
val shift = 1 + ( secondReplicaShift + replicaIndex ) % ( nBrokers - 1 )
( firstReplicaIndex + shift ) % nBrokers
}
}