@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException;
@@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidGroupIdException ;
import org.apache.kafka.common.errors.InvalidRequestException ;
import org.apache.kafka.common.errors.InvalidTopicException ;
import org.apache.kafka.common.errors.LeaderNotAvailableException ;
import org.apache.kafka.common.errors.RetriableException ;
import org.apache.kafka.common.errors.TimeoutException ;
import org.apache.kafka.common.errors.UnknownServerException ;
@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient {
@@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeConsumerGroupsResult ( new HashMap < String , KafkaFuture < ConsumerGroupDescription > > ( futures ) ) ;
}
private final static class ListConsumerGroupsResults {
private final List < Throwable > errors ;
private final HashMap < String , ConsumerGroupListing > listings ;
private final HashSet < Node > remaining ;
private final KafkaFutureImpl < Collection < Object > > future ;
ListConsumerGroupsResults ( Collection < Throwable > errors , Collection < Node > leaders ,
KafkaFutureImpl < Collection < Object > > future ) {
this . errors = new ArrayList < > ( errors ) ;
this . listings = new HashMap < > ( ) ;
this . remaining = new HashSet < > ( leaders ) ;
this . future = future ;
tryComplete ( ) ;
}
synchronized void addError ( Throwable throwable , Node node ) {
ApiError error = ApiError . fromThrowable ( throwable ) ;
if ( error . message ( ) = = null | | error . message ( ) . isEmpty ( ) ) {
errors . add ( error . error ( ) . exception (
"Error listing groups on " + node ) ) ;
} else {
errors . add ( error . error ( ) . exception (
"Error listing groups on " + node + ": " + error . message ( ) ) ) ;
}
}
synchronized void addListing ( ConsumerGroupListing listing ) {
listings . put ( listing . groupId ( ) , listing ) ;
}
synchronized void tryComplete ( Node leader ) {
remaining . remove ( leader ) ;
tryComplete ( ) ;
}
private synchronized void tryComplete ( ) {
if ( remaining . isEmpty ( ) ) {
ArrayList < Object > results = new ArrayList < Object > ( listings . values ( ) ) ;
results . addAll ( errors ) ;
future . complete ( results ) ;
}
}
} ;
@Override
public ListConsumerGroupsResult listConsumerGroups ( ListConsumerGroupsOptions options ) {
final Map < Node , KafkaFutureImpl < Collection < ConsumerGroupListing > > > futuresMap = new HashMap < > ( ) ;
final KafkaFutureImpl < Collection < ConsumerGroupListing > > flattenFuture = new KafkaFutureImpl < > ( ) ;
final KafkaFutureImpl < Void > listFuture = new KafkaFutureImpl < > ( ) ;
final KafkaFutureImpl < Collection < Object > > all = new KafkaFutureImpl < > ( ) ;
final long nowMetadata = time . milliseconds ( ) ;
final long deadline = calcDeadlineMs ( nowMetadata , options . timeoutMs ( ) ) ;
runnable . call ( new Call ( "listNodes" , deadline , new LeastLoadedNodeProvider ( ) ) {
runnable . call ( new Call ( "findGroupsMetadata" , deadline , new LeastLoadedNodeProvider ( ) ) {
@Override
AbstractRequest . Builder createRequest ( int timeoutMs ) {
return new MetadataRequest . Builder ( Collections . singletonList ( Topic . GROUP_METADATA_TOPIC_NAME ) , true ) ;
@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient {
@@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse ( AbstractResponse abstractResponse ) {
MetadataResponse metadataResponse = ( MetadataResponse ) abstractResponse ;
final List < Throwable > metadataExceptions = new ArrayList < > ( ) ;
final HashSet < Node > leaders = new HashSet < > ( ) ;
for ( final MetadataResponse . TopicMetadata metadata : metadataResponse . topicMetadata ( ) ) {
if ( metadata . topic ( ) . equals ( Topic . GROUP_METADATA_TOPIC_NAME ) ) {
if ( metadata . error ( ) ! = Errors . NONE ) {
metadataExceptions . add ( metadata . error ( ) . exception ( "Unable to locate " +
Topic . GROUP_METADATA_TOPIC_NAME ) ) ;
} else if ( ! metadata . topic ( ) . equals ( Topic . GROUP_METADATA_TOPIC_NAME ) ) {
metadataExceptions . add ( new UnknownServerException ( "Server returned unrequested " +
"information about unexpected topic " + metadata . topic ( ) ) ) ;
} else {
for ( final MetadataResponse . PartitionMetadata partitionMetadata : metadata . partitionMetadata ( ) ) {
final Node leader = partitionMetadata . leader ( ) ;
if ( partitionMetadata . error ( ) ! = Errors . NONE ) {
// TODO: KAFKA-6789, retry based on the error code
KafkaFutureImpl < Collection < ConsumerGroupListing > > future = new KafkaFutureImpl < > ( ) ;
future . completeExceptionally ( partitionMetadata . error ( ) . exception ( ) ) ;
// if it is the leader not found error, then the leader might be NoNode; if there are more than
// one such error, we will only have one entry in the map. For now it is okay since we are not
// guaranteeing to return the full list of consumers still.
futuresMap . put ( leader , future ) ;
} else {
futuresMap . put ( leader , new KafkaFutureImpl < Collection < ConsumerGroupListing > > ( ) ) ;
}
}
listFuture . complete ( null ) ;
metadataExceptions . add ( partitionMetadata . error ( ) . exception ( "Unable to find " +
"leader for partition " + partitionMetadata . partition ( ) + " of " +
Topic . GROUP_METADATA_TOPIC_NAME ) ) ;
} else if ( leader = = null | | leader . equals ( Node . noNode ( ) ) ) {
metadataExceptions . add ( new LeaderNotAvailableException ( "Unable to find leader " +
"for partition " + partitionMetadata . partition ( ) + " of " +
Topic . GROUP_METADATA_TOPIC_NAME ) ) ;
} else {
if ( metadata . error ( ) ! = Errors . NONE )
listFuture . completeExceptionally ( metadata . error ( ) . exception ( ) ) ;
else
listFuture . completeExceptionally ( new IllegalStateException ( "Unexpected topic metadata for "
+ metadata . topic ( ) + " is returned; cannot find the brokers to query consumer listings." ) ) ;
}
leaders . add ( leader ) ;
}
// we have to flatten the future here instead in the result, because we need to wait until the map of nodes
// are known from the listNode request.
flattenFuture . copyWith (
KafkaFuture . allOf ( futuresMap . values ( ) . toArray ( new KafkaFuture [ 0 ] ) ) ,
new KafkaFuture . BaseFunction < Void , Collection < ConsumerGroupListing > > ( ) {
@Override
public Collection < ConsumerGroupListing > apply ( Void v ) {
List < ConsumerGroupListing > listings = new ArrayList < > ( ) ;
for ( Map . Entry < Node , KafkaFutureImpl < Collection < ConsumerGroupListing > > > entry : futuresMap . entrySet ( ) ) {
Collection < ConsumerGroupListing > results ;
try {
results = entry . getValue ( ) . get ( ) ;
listings . addAll ( results ) ;
} catch ( Throwable e ) {
// This should be unreachable, because allOf ensured that all the futures
// completed successfully.
throw new RuntimeException ( e ) ;
}
}
return listings ;
}
} ) ;
for ( final Map . Entry < Node , KafkaFutureImpl < Collection < ConsumerGroupListing > > > entry : futuresMap . entrySet ( ) ) {
// skip sending the request for those futures who have already failed
if ( entry . getValue ( ) . isCompletedExceptionally ( ) )
continue ;
final ListConsumerGroupsResults results =
new ListConsumerGroupsResults ( metadataExceptions , leaders , all ) ;
for ( final Node node : leaders ) {
final long nowList = time . milliseconds ( ) ;
final int brokerId = entry . getKey ( ) . id ( ) ;
final KafkaFutureImpl < Collection < ConsumerGroupListing > > future = entry . getValue ( ) ;
runnable . call ( new Call ( "listConsumerGroups" , deadline , new ConstantNodeIdProvider ( brokerId ) ) {
runnable . call ( new Call ( "listConsumerGroups" , deadline , new ConstantNodeIdProvider ( node . id ( ) ) ) {
@Override
AbstractRequest . Builder createRequest ( int timeoutMs ) {
return new ListGroupsRequest . Builder ( ) ;
@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient {
@@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse ( AbstractResponse abstractResponse ) {
final ListGroupsResponse response = ( ListGroupsResponse ) abstractResponse ;
synchronized ( results ) {
if ( response . error ( ) ! = Errors . NONE ) {
future . completeExceptionally ( response . error ( ) . exception ( ) ) ;
results . addError ( response . error ( ) . exception ( ) , node ) ;
} else {
final List < ConsumerGroupListing > groupsListing = new ArrayList < > ( ) ;
for ( ListGroupsResponse . Group group : response . groups ( ) ) {
if ( group . protocolType ( ) . equals ( ConsumerProtocol . PROTOCOL_TYPE ) | | group . protocolType ( ) . isEmpty ( ) ) {
if ( group . protocolType ( ) . equals ( ConsumerProtocol . PROTOCOL_TYPE ) | |
group . protocolType ( ) . isEmpty ( ) ) {
final String groupId = group . groupId ( ) ;
final String protocolType = group . protocolType ( ) ;
final ConsumerGroupListing groupListing = new ConsumerGroupListing ( groupId , protocolType . isEmpty ( ) ) ;
groupsListing . add ( groupListing ) ;
results . addListing ( groupListing ) ;
}
}
}
future . complete ( groupsListing ) ;
results . tryComplete ( node ) ;
}
}
@Override
void handleFailure ( Throwable throwable ) {
future . completeExceptionally ( throwable ) ;
synchronized ( results ) {
results . addError ( throwable , node ) ;
results . tryComplete ( node ) ;
}
}
} , nowList ) ;
}
}
@Override
void handleFailure ( Throwable throwable ) {
listFuture . completeExceptionally ( throwable ) ;
all . complete ( Collections . < Object > singletonList ( throwable ) ) ;
}
} , nowMetadata ) ;
return new ListConsumerGroupsResult ( listFuture , flattenFuture , futuresMap ) ;
return new ListConsumerGroupsResult ( all ) ;
}
@Override