@ -26,20 +26,26 @@ import net.sourceforge.argparse4j.inf.Subparsers;
@@ -26,20 +26,26 @@ import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin ;
import org.apache.kafka.clients.admin.AdminClientConfig ;
import org.apache.kafka.clients.admin.QuorumInfo ;
import org.apache.kafka.common.KafkaException ;
import org.apache.kafka.common.utils.Exit ;
import org.apache.kafka.common.utils.Utils ;
import org.apache.kafka.server.util.ToolsUtils ;
import java.io.File ;
import java.io.IOException ;
import java.time.Duration ;
import java.time.Instant ;
import java.util.ArrayList ;
import java.util.Comparator ;
import java.util.List ;
import java.util.Optional ;
import java.util.Properties ;
import java.util.concurrent.ExecutionException ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
import static java.lang.String.format ;
import static java.lang.String.valueOf ;
import static java.util.Arrays.asList ;
/ * *
@ -74,13 +80,11 @@ public class MetadataQuorumCommand {
@@ -74,13 +80,11 @@ public class MetadataQuorumCommand {
. addArgument ( "--bootstrap-server" )
. help ( "A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster." )
. required ( true ) ;
parser
. addArgument ( "--command-config" )
. type ( Arguments . fileType ( ) )
. help ( "Property file containing configs to be passed to Admin Client." ) ;
Subparsers subparsers = parser . addSubparsers ( ) . dest ( "command" ) ;
addDescribeParser ( subparsers ) ;
addDescribeSubParser ( parser ) ;
Admin admin = null ;
try {
@ -96,14 +100,18 @@ public class MetadataQuorumCommand {
@@ -96,14 +100,18 @@ public class MetadataQuorumCommand {
if ( namespace . getBoolean ( "status" ) & & namespace . getBoolean ( "replication" ) ) {
throw new TerseException ( "Only one of --status or --replication should be specified with describe sub-command" ) ;
} else if ( namespace . getBoolean ( "replication" ) ) {
handleDescribeReplication ( admin ) ;
boolean humanReadable = Optional . of ( namespace . getBoolean ( "human_readable" ) ) . orElse ( false ) ;
handleDescribeReplication ( admin , humanReadable ) ;
} else if ( namespace . getBoolean ( "status" ) ) {
if ( namespace . getBoolean ( "human_readable" ) ) {
throw new TerseException ( "The option --human-readable is only supported along with --replication" ) ;
}
handleDescribeStatus ( admin ) ;
} else {
throw new TerseException ( "One of --status or --replication must be specified with describe sub-command" ) ;
}
} else {
throw new IllegalStateException ( "Unknown command: " + command + ", only 'describe' is supported" ) ;
throw new IllegalStateException ( format ( "Unknown command: %s, only 'describe' is supported" , command ) ) ;
}
} finally {
if ( admin ! = null )
@ -121,7 +129,8 @@ public class MetadataQuorumCommand {
@@ -121,7 +129,8 @@ public class MetadataQuorumCommand {
}
}
private static void addDescribeParser ( Subparsers subparsers ) {
private static void addDescribeSubParser ( ArgumentParser parser ) {
Subparsers subparsers = parser . addSubparsers ( ) . dest ( "command" ) ;
Subparser describeParser = subparsers
. addParser ( "describe" )
. help ( "Describe the metadata quorum info" ) ;
@ -131,22 +140,27 @@ public class MetadataQuorumCommand {
@@ -131,22 +140,27 @@ public class MetadataQuorumCommand {
. addArgument ( "--status" )
. help ( "A short summary of the quorum status and the other provides detailed information about the status of replication." )
. action ( Arguments . storeTrue ( ) ) ;
ArgumentGroup replicationArgs = describeParser . addArgumentGroup ( "Replication" ) ;
replicationArgs
. addArgument ( "--replication" )
. help ( "Detailed information about the status of replication" )
. action ( Arguments . storeTrue ( ) ) ;
replicationArgs
. addArgument ( "--human-readable" )
. help ( "Human-readable output" )
. action ( Arguments . storeTrue ( ) ) ;
}
private static void handleDescribeReplication ( Admin admin ) throws ExecutionException , InterruptedException {
private static void handleDescribeReplication ( Admin admin , boolean humanReadable ) throws ExecutionException , InterruptedException {
QuorumInfo quorumInfo = admin . describeMetadataQuorum ( ) . quorumInfo ( ) . get ( ) ;
int leaderId = quorumInfo . leaderId ( ) ;
QuorumInfo . ReplicaState leader = quorumInfo . voters ( ) . stream ( ) . filter ( voter - > voter . replicaId ( ) = = leaderId ) . findFirst ( ) . get ( ) ;
List < List < String > > rows = new ArrayList < > ( ) ;
rows . addAll ( quorumInfoToRows ( leader , Stream . of ( leader ) , "Leader" ) ) ;
rows . addAll ( quorumInfoToRows ( leader , quorumInfo . voters ( ) . stream ( ) . filter ( v - > v . replicaId ( ) ! = leaderId ) , "Follower" ) ) ;
rows . addAll ( quorumInfoToRows ( leader , quorumInfo . observers ( ) . stream ( ) , "Observer" ) ) ;
rows . addAll ( quorumInfoToRows ( leader , Stream . of ( leader ) , "Leader" , humanReadable ) ) ;
rows . addAll ( quorumInfoToRows ( leader , quorumInfo . voters ( ) . stream ( ) . filter ( v - > v . replicaId ( ) ! = leaderId ) , "Follower" , humanReadable ) ) ;
rows . addAll ( quorumInfoToRows ( leader , quorumInfo . observers ( ) . stream ( ) , "Observer" , humanReadable ) ) ;
ToolsUtils . prettyPrintTable (
asList ( "NodeId" , "LogEndOffset" , "Lag" , "LastFetchTimestamp" , "LastCaughtUpTimestamp" , "Status" ) ,
@ -155,17 +169,39 @@ public class MetadataQuorumCommand {
@@ -155,17 +169,39 @@ public class MetadataQuorumCommand {
) ;
}
private static List < List < String > > quorumInfoToRows ( QuorumInfo . ReplicaState leader , Stream < QuorumInfo . ReplicaState > infos , String status ) {
return infos . map ( info - >
Stream . of (
private static List < List < String > > quorumInfoToRows ( QuorumInfo . ReplicaState leader ,
Stream < QuorumInfo . ReplicaState > infos ,
String status ,
boolean humanReadable ) {
return infos . map ( info - > {
String lastFetchTimestamp = ! info . lastFetchTimestamp ( ) . isPresent ( ) ? "-1" :
humanReadable ? format ( "%d ms ago" , relativeTimeMs ( info . lastFetchTimestamp ( ) . getAsLong ( ) , "last fetch" ) ) :
valueOf ( info . lastFetchTimestamp ( ) . getAsLong ( ) ) ;
String lastCaughtUpTimestamp = ! info . lastCaughtUpTimestamp ( ) . isPresent ( ) ? "-1" :
humanReadable ? format ( "%d ms ago" , relativeTimeMs ( info . lastCaughtUpTimestamp ( ) . getAsLong ( ) , "last caught up" ) ) :
valueOf ( info . lastCaughtUpTimestamp ( ) . getAsLong ( ) ) ;
return Stream . of (
info . replicaId ( ) ,
info . logEndOffset ( ) ,
leader . logEndOffset ( ) - info . logEndOffset ( ) ,
info . lastFetchTimestamp ( ) . orElse ( - 1 ) ,
info . lastCaughtUpTimestamp ( ) . orElse ( - 1 ) ,
lastFetchTimestamp ,
lastCaughtUpTimestamp ,
status
) . map ( r - > r . toString ( ) ) . collect ( Collectors . toList ( ) )
) . collect ( Collectors . toList ( ) ) ;
) . map ( r - > r . toString ( ) ) . collect ( Collectors . toList ( ) ) ;
} ) . collect ( Collectors . toList ( ) ) ;
}
// visible for testing
static long relativeTimeMs ( long timestampMs , String desc ) {
Instant lastTimestamp = Instant . ofEpochMilli ( timestampMs ) ;
Instant now = Instant . now ( ) ;
if ( ! ( lastTimestamp . isAfter ( Instant . EPOCH ) & & lastTimestamp . isBefore ( now ) ) ) {
throw new KafkaException (
format ( "Error while computing relative time, possible drift in system clock.%n" +
"Current timestamp is %d, %s timestamp is %d" , now . toEpochMilli ( ) , desc , timestampMs )
) ;
}
return Duration . between ( lastTimestamp , now ) . toMillis ( ) ;
}
private static void handleDescribeStatus ( Admin admin ) throws ExecutionException , InterruptedException {