@ -120,7 +120,7 @@ object TopicCommand extends Logging {
config . map ( _ . get ( TopicConfig . MIN_IN_SYNC_REPLICAS_CONFIG ) . value . toInt )
config . map ( _ . get ( TopicConfig . MIN_IN_SYNC_REPLICAS_CONFIG ) . value . toInt )
}
}
def hasUnderReplicatedPartitions : Boolean = {
def isUnderReplicated : Boolean = {
getReplicationFactor ( info , reassignment ) - info . isr . size > 0
getReplicationFactor ( info , reassignment ) - info . isr . size > 0
}
}
@ -128,7 +128,7 @@ object TopicCommand extends Logging {
info . leader != null
info . leader != null
}
}
def hasUnderMinIsrPartitions : Boolean = {
def isUnderMinIsr : Boolean = {
! hasLeader || minIsrCount . exists ( info . isr . size < _ )
! hasLeader || minIsrCount . exists ( info . isr . size < _ )
}
}
@ -161,13 +161,13 @@ object TopicCommand extends Logging {
val describePartitions : Boolean = ! opts . reportOverriddenConfigs
val describePartitions : Boolean = ! opts . reportOverriddenConfigs
private def shouldPrintUnderReplicatedPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
private def shouldPrintUnderReplicatedPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
opts . reportUnderReplicatedPartitions && partitionDescription . hasUnderReplicatedPartitions
opts . reportUnderReplicatedPartitions && partitionDescription . isUnderReplicated
}
}
private def shouldPrintUnavailablePartitions ( partitionDescription : PartitionDescription ) : Boolean = {
private def shouldPrintUnavailablePartitions ( partitionDescription : PartitionDescription ) : Boolean = {
opts . reportUnavailablePartitions && partitionDescription . hasUnavailablePartitions ( liveBrokers )
opts . reportUnavailablePartitions && partitionDescription . hasUnavailablePartitions ( liveBrokers )
}
}
private def shouldPrintUnderMinIsrPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
private def shouldPrintUnderMinIsrPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
opts . reportUnderMinIsrPartitions && partitionDescription . hasUnderMinIsrPartitions
opts . reportUnderMinIsrPartitions && partitionDescription . isUnderMinIsr
}
}
private def shouldPrintAtMinIsrPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
private def shouldPrintAtMinIsrPartitions ( partitionDescription : PartitionDescription ) : Boolean = {
opts . reportAtMinIsrPartitions && partitionDescription . isAtMinIsrPartitions
opts . reportAtMinIsrPartitions && partitionDescription . isAtMinIsrPartitions
@ -269,27 +269,26 @@ object TopicCommand extends Logging {
} } . toMap . asJava ) . all ( ) . get ( )
} } . toMap . asJava ) . all ( ) . get ( )
}
}
override def describeTopic ( opts : TopicCommandOptions ) : Unit = {
private def listAllReassignments ( ) : Map [ TopicPartition , PartitionReassignment ] = {
val topics = getTopics ( opts . topic , opts . excludeInternalTopics )
try {
val allConfigs = adminClient . describeConfigs ( topics . map ( new ConfigResource ( Type . TOPIC , _ ) ) . asJavaCollection ) . values ( )
adminClient . listPartitionReassignments ( new ListPartitionReassignmentsOptions ) . reassignments ( ) . get ( ) . asScala
val liveBrokers = adminClient . describeCluster ( ) . nodes ( ) . get ( ) . asScala . map ( _ . id ( ) )
val topicDescriptions = adminClient . describeTopics ( topics . asJavaCollection ) . all ( ) . get ( ) . values ( ) . asScala
val reassignments : util.Map [ TopicPartition , PartitionReassignment ] = try {
if ( opts . topic . isEmpty ) {
adminClient . listPartitionReassignments ( new ListPartitionReassignmentsOptions ) . reassignments ( ) . get ( )
} else {
val topicPartitions = topicDescriptions . flatMap ( pi => pi . partitions ( ) . asScala . map ( tpi => new TopicPartition ( pi . name ( ) , tpi . partition ( ) ) ) )
adminClient . listPartitionReassignments ( topicPartitions . toSet . asJava ) . reassignments ( ) . get ( )
}
} catch {
} catch {
case e : ExecutionException =>
case e : ExecutionException =>
e . getCause match {
e . getCause match {
case ex : UnsupportedVersionException =>
case ex : UnsupportedVersionException =>
logger . debug ( "Couldn't query reassignments through the AdminClient API" , ex )
logger . debug ( "Couldn't query reassignments through the AdminClient API" , ex )
Collections . empty Map( )
Map ( )
case t => throw t
case t => throw t
}
}
}
}
}
override def describeTopic ( opts : TopicCommandOptions ) : Unit = {
val topics = getTopics ( opts . topic , opts . excludeInternalTopics )
val allConfigs = adminClient . describeConfigs ( topics . map ( new ConfigResource ( Type . TOPIC , _ ) ) . asJavaCollection ) . values ( )
val liveBrokers = adminClient . describeCluster ( ) . nodes ( ) . get ( ) . asScala . map ( _ . id ( ) )
val reassignments = listAllReassignments ( )
val topicDescriptions = adminClient . describeTopics ( topics . asJavaCollection ) . all ( ) . get ( ) . values ( ) . asScala
val describeOptions = new DescribeOptions ( opts , liveBrokers . toSet )
val describeOptions = new DescribeOptions ( opts , liveBrokers . toSet )
for ( td <- topicDescriptions ) {
for ( td <- topicDescriptions ) {
@ -302,7 +301,7 @@ object TopicCommand extends Logging {
if ( ! opts . reportOverriddenConfigs || hasNonDefault ) {
if ( ! opts . reportOverriddenConfigs || hasNonDefault ) {
val numPartitions = td . partitions ( ) . size
val numPartitions = td . partitions ( ) . size
val firstPartition = td . partitions . iterator . next ( )
val firstPartition = td . partitions . iterator . next ( )
val reassignment = getReassignment ( td . name , firstPartition . partition , reassignments )
val reassignment = reassignments . get ( new TopicPartition ( td . name , firstPartition . partition ) )
val topicDesc = TopicDescription ( topicName , numPartitions , getReplicationFactor ( firstPartition , reassignment ) , config , markedForDeletion = false )
val topicDesc = TopicDescription ( topicName , numPartitions , getReplicationFactor ( firstPartition , reassignment ) , config , markedForDeletion = false )
topicDesc . printDescription ( )
topicDesc . printDescription ( )
}
}
@ -310,7 +309,7 @@ object TopicCommand extends Logging {
if ( describeOptions . describePartitions ) {
if ( describeOptions . describePartitions ) {
for ( partition <- sortedPartitions ) {
for ( partition <- sortedPartitions ) {
val reassignment = getReassignment ( td . name , partition . partition , reassignments )
val reassignment = reassignments . get ( new TopicPartition ( td . name , partition . partition ) )
val partitionDesc = PartitionDescription ( topicName , partition , Some ( config ) , markedForDeletion = false , reassignment )
val partitionDesc = PartitionDescription ( topicName , partition , Some ( config ) , markedForDeletion = false , reassignment )
describeOptions . maybePrintPartitionDescription ( partitionDesc )
describeOptions . maybePrintPartitionDescription ( partitionDesc )
}
}
@ -318,11 +317,6 @@ object TopicCommand extends Logging {
}
}
}
}
private def getReassignment ( topic : String ,
partition : Int ,
reassignments : util.Map [ TopicPartition , PartitionReassignment ] ) : Option [ PartitionReassignment ] =
Option ( reassignments . get ( new TopicPartition ( topic , partition ) ) )
override def deleteTopic ( opts : TopicCommandOptions ) : Unit = {
override def deleteTopic ( opts : TopicCommandOptions ) : Unit = {
val topics = getTopics ( opts . topic , opts . excludeInternalTopics )
val topics = getTopics ( opts . topic , opts . excludeInternalTopics )
ensureTopicExists ( topics , opts . topic )
ensureTopicExists ( topics , opts . topic )
@ -561,9 +555,18 @@ object TopicCommand extends Logging {
}
}
private def getReplicationFactor ( tpi : TopicPartitionInfo , reassignment : Option [ PartitionReassignment ] ) : Int = {
private def getReplicationFactor ( tpi : TopicPartitionInfo , reassignment : Option [ PartitionReassignment ] ) : Int = {
// It is possible for a reassignment to complete between the time we have fetched its state and the time
// we fetch partition metadata . In ths case , we ignore the reassignment when determining replication factor .
def isReassignmentInProgress ( ra : PartitionReassignment ) : Boolean = {
// Reassignment is still in progress as long as the removing replicas are still present
val allReplicaIds = tpi . replicas . asScala . map ( _ . id ) . toSet
val removingReplicaIds = ra . removingReplicas . asScala . map ( Int . unbox ) . toSet
allReplicaIds . exists ( removingReplicaIds . contains )
}
reassignment match {
reassignment match {
case Some ( ra ) => ra . replicas . asScala . diff ( ra . addingReplicas . asScala ) . size
case Some ( ra ) if isReassignmentInProgress ( ra ) => ra . replicas . asScala . diff ( ra . addingReplicas . asScala ) . size
case None => tpi . replicas . size
case _ => tpi . replicas . size
}
}
}
}