@ -110,13 +110,14 @@ object ConsumerGroupCommand extends Logging {
@@ -110,13 +110,14 @@ object ConsumerGroupCommand extends Logging {
}
def printOffsetsToReset ( groupAssignmentsToReset : Map [ String , Map [ TopicPartition , OffsetAndMetadata ] ] ) : Unit = {
val format = "%-30s %-30s %-10s %-15s"
if ( groupAssignmentsToReset . nonEmpty )
println ( "\n%-30s %-30s %-10s %-15s" . format ( "GROUP" , "TOPIC" , "PARTITION" , "NEW-OFFSET" ) )
println ( "\n" + format . format ( "GROUP" , "TOPIC" , "PARTITION" , "NEW-OFFSET" ) )
for {
( groupId , assignment ) <- groupAssignmentsToReset
( consumerAssignment , offsetAndMetadata ) <- assignment
} {
println ( "%-30s %-30s %-10s %-15s" . format (
println ( format . format (
groupId ,
consumerAssignment . topic ,
consumerAssignment . partition ,
@ -217,9 +218,10 @@ object ConsumerGroupCommand extends Logging {
@@ -217,9 +218,10 @@ object ConsumerGroupCommand extends Logging {
for ( ( groupId , _ ) <- groupsAndStates ) {
maxGroupLen = Math . max ( maxGroupLen , groupId . length )
}
println ( s" % ${ - maxGroupLen } s %s " . format ( "GROUP" , "STATE" ) )
val format = s" % ${ - maxGroupLen } s %s "
println ( format . format ( "GROUP" , "STATE" ) )
for ( ( groupId , state ) <- groupsAndStates ) {
println ( s" % ${ - maxGroupLen } s %s " . format ( groupId , state ) )
println ( format . format ( groupId , state ) )
}
}
@ -264,14 +266,15 @@ object ConsumerGroupCommand extends Logging {
@@ -264,14 +266,15 @@ object ConsumerGroupCommand extends Logging {
}
}
println ( s" \n % ${ - maxGroupLen } s % ${ - maxTopicLen } s %-10s %-15s %-15s %-15s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s %s "
val format = s" % ${ - maxGroupLen } s % ${ - maxTopicLen } s %-10s %-15s %-15s %-15s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s %s "
println ( "\n" + format
. format ( "GROUP" , "TOPIC" , "PARTITION" , "CURRENT-OFFSET" , "LOG-END-OFFSET" , "LAG" , "CONSUMER-ID" , "HOST" , "CLIENT-ID" ) )
assignments match {
case None => // do nothing
case Some ( consumerAssignments ) =>
consumerAssignments . foreach { consumerAssignment =>
println ( s" % ${ - maxGroupLen } s % ${ - maxTopicLen } s %-10s %-15s %-15s %-15s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s %s " . format (
println ( format . format (
consumerAssignment . group ,
consumerAssignment . topic . getOrElse ( MISSING_COLUMN_VALUE ) , consumerAssignment . partition . getOrElse ( MISSING_COLUMN_VALUE ) ,
consumerAssignment . offset . getOrElse ( MISSING_COLUMN_VALUE ) , consumerAssignment . logEndOffset . getOrElse ( MISSING_COLUMN_VALUE ) ,
@ -302,11 +305,14 @@ object ConsumerGroupCommand extends Logging {
@@ -302,11 +305,14 @@ object ConsumerGroupCommand extends Logging {
}
}
val wideFormat = s" % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxGroupInstanceIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s "
val shortFormat = s" % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s "
if ( includeGroupInstanceId ) {
print ( s" \n % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxGroupInstanceIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s "
print ( "\n" + wideFormat
. format ( "GROUP" , "CONSUMER-ID" , "GROUP-INSTANCE-ID" , "HOST" , "CLIENT-ID" , "#PARTITIONS" ) )
} else {
print ( s" \n % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s "
print ( "\n" + shortFormat
. format ( "GROUP" , "CONSUMER-ID" , "HOST" , "CLIENT-ID" , "#PARTITIONS" ) )
}
if ( verbose )
@ -318,11 +324,11 @@ object ConsumerGroupCommand extends Logging {
@@ -318,11 +324,11 @@ object ConsumerGroupCommand extends Logging {
case Some ( memberAssignments ) =>
memberAssignments . foreach { memberAssignment =>
if ( includeGroupInstanceId ) {
print ( s" % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxGroupInstanceIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s " . format (
print ( wideFormat . format (
memberAssignment . group , memberAssignment . consumerId , memberAssignment . groupInstanceId , memberAssignment . host ,
memberAssignment . clientId , memberAssignment . numPartitions ) )
} else {
print ( s" % ${ - maxGroupLen } s % ${ - maxConsumerIdLen } s % ${ - maxHostLen } s % ${ - maxClientIdLen } s %-15s " . format (
print ( shortFormat . format (
memberAssignment . group , memberAssignment . consumerId , memberAssignment . host , memberAssignment . clientId , memberAssignment . numPartitions ) )
}
if ( verbose ) {
@ -347,8 +353,9 @@ object ConsumerGroupCommand extends Logging {
@@ -347,8 +353,9 @@ object ConsumerGroupCommand extends Logging {
if ( shouldPrintMemberState ( groupId , Some ( state . state ) , Some ( 1 ) ) ) {
val coordinator = s" ${ state . coordinator . host } : ${ state . coordinator . port } ( ${ state . coordinator . idString } ) "
val coordinatorColLen = Math . max ( 25 , coordinator . length )
print ( s" \n % ${ - coordinatorColLen } s %-25s %-20s %-15s %s " . format ( "GROUP" , "COORDINATOR (ID)" , "ASSIGNMENT-STRATEGY" , "STATE" , "#MEMBERS" ) )
print ( s" \n % ${ - coordinatorColLen } s %-25s %-20s %-15s %s " . format ( state . group , coordinator , state . assignmentStrategy , state . state , state . numMembers ) )
val format = s" \n % ${ - coordinatorColLen } s %-25s %-20s %-15s %s "
print ( format . format ( "GROUP" , "COORDINATOR (ID)" , "ASSIGNMENT-STRATEGY" , "STATE" , "#MEMBERS" ) )
print ( format . format ( state . group , coordinator , state . assignmentStrategy , state . state , state . numMembers ) )
println ( )
}
}
@ -525,9 +532,10 @@ object ConsumerGroupCommand extends Logging {
@@ -525,9 +532,10 @@ object ConsumerGroupCommand extends Logging {
printError ( s" Encounter some unknown error: $topLevelResult " )
}
println ( "\n%-30s %-15s %-15s" . format ( "TOPIC" , "PARTITION" , "STATUS" ) )
val format = "%-30s %-15s %-15s"
println ( "\n" + format . format ( "TOPIC" , "PARTITION" , "STATUS" ) )
partitionLevelResult . toList . sortBy ( t => t . _1 . topic + t . _1 . partition . toString ) . foreach { case ( tp , error ) =>
println ( "%-30s %-15s %-15s" . format (
println ( format . format (
tp . topic ,
if ( tp . partition >= 0 ) tp . partition else "Not Provided" ,
if ( error != null ) s" Error: ${ error . getMessage } " else "Successful"