From 6be908a8296456adee254b405605acff55fd47a5 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 25 Apr 2018 17:49:02 -0700 Subject: [PATCH] MINOR: Refactor AdminClient ListConsumerGroups API (#4884) The current Iterator-based ListConsumerGroups API is synchronous. The API should be asynchronous to fit in with the other AdminClient APIs. Also fix some error handling corner cases. Reviewers: Guozhang Wang , Jason Gustafson --- .../kafka/clients/admin/KafkaAdminClient.java | 160 ++++++++++-------- .../admin/ListConsumerGroupsResult.java | 109 ++++++------ .../clients/admin/KafkaAdminClientTest.java | 52 +++--- 3 files changed, 166 insertions(+), 155 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa3f943555b..d8c0bad8066 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; @@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient { return new DescribeConsumerGroupsResult(new HashMap>(futures)); } + private final static class ListConsumerGroupsResults { + private final List errors; + private final HashMap listings; + private final HashSet remaining; + private final KafkaFutureImpl> future; + + ListConsumerGroupsResults(Collection errors, Collection leaders, + KafkaFutureImpl> future) { + this.errors = new ArrayList<>(errors); + this.listings = new HashMap<>(); + this.remaining = new HashSet<>(leaders); + this.future = future; + tryComplete(); + } + + synchronized void addError(Throwable throwable, Node node) { + ApiError error = ApiError.fromThrowable(throwable); + if (error.message() == null || error.message().isEmpty()) { + errors.add(error.error().exception( + "Error listing groups on " + node)); + } else { + errors.add(error.error().exception( + "Error listing groups on " + node + ": " + error.message())); + } + } + + synchronized void addListing(ConsumerGroupListing listing) { + listings.put(listing.groupId(), listing); + } + + synchronized void tryComplete(Node leader) { + remaining.remove(leader); + tryComplete(); + } + + private synchronized void tryComplete() { + if (remaining.isEmpty()) { + ArrayList results = new ArrayList(listings.values()); + results.addAll(errors); + future.complete(results); + } + } + }; + @Override public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { - final Map>> futuresMap = new HashMap<>(); - final KafkaFutureImpl> flattenFuture = new KafkaFutureImpl<>(); - final KafkaFutureImpl listFuture = new KafkaFutureImpl<>(); - + final KafkaFutureImpl> all = new KafkaFutureImpl<>(); final long nowMetadata = time.milliseconds(); final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); - - runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) { + runnable.call(new Call("findGroupsMetadata", deadline, new LeastLoadedNodeProvider()) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true); @@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse abstractResponse) { MetadataResponse metadataResponse = (MetadataResponse) abstractResponse; - + final List metadataExceptions = new ArrayList<>(); + final HashSet leaders = new HashSet<>(); for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { - if (metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) { + if (metadata.error() != Errors.NONE) { + metadataExceptions.add(metadata.error().exception("Unable to locate " + + Topic.GROUP_METADATA_TOPIC_NAME)); + } else if (!metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) { + metadataExceptions.add(new UnknownServerException("Server returned unrequested " + + "information about unexpected topic " + metadata.topic())); + } else { for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { final Node leader = partitionMetadata.leader(); if (partitionMetadata.error() != Errors.NONE) { // TODO: KAFKA-6789, retry based on the error code - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.completeExceptionally(partitionMetadata.error().exception()); - // if it is the leader not found error, then the leader might be NoNode; if there are more than - // one such error, we will only have one entry in the map. For now it is okay since we are not - // guaranteeing to return the full list of consumers still. - futuresMap.put(leader, future); + metadataExceptions.add(partitionMetadata.error().exception("Unable to find " + + "leader for partition " + partitionMetadata.partition() + " of " + + Topic.GROUP_METADATA_TOPIC_NAME)); + } else if (leader == null || leader.equals(Node.noNode())) { + metadataExceptions.add(new LeaderNotAvailableException("Unable to find leader " + + "for partition " + partitionMetadata.partition() + " of " + + Topic.GROUP_METADATA_TOPIC_NAME)); } else { - futuresMap.put(leader, new KafkaFutureImpl>()); + leaders.add(leader); } } - listFuture.complete(null); - } else { - if (metadata.error() != Errors.NONE) - listFuture.completeExceptionally(metadata.error().exception()); - else - listFuture.completeExceptionally(new IllegalStateException("Unexpected topic metadata for " - + metadata.topic() + " is returned; cannot find the brokers to query consumer listings.")); } } - - // we have to flatten the future here instead in the result, because we need to wait until the map of nodes - // are known from the listNode request. - flattenFuture.copyWith( - KafkaFuture.allOf(futuresMap.values().toArray(new KafkaFuture[0])), - new KafkaFuture.BaseFunction>() { - @Override - public Collection apply(Void v) { - List listings = new ArrayList<>(); - for (Map.Entry>> entry : futuresMap.entrySet()) { - Collection results; - try { - results = entry.getValue().get(); - listings.addAll(results); - } catch (Throwable e) { - // This should be unreachable, because allOf ensured that all the futures - // completed successfully. - throw new RuntimeException(e); - } - } - return listings; - } - }); - - for (final Map.Entry>> entry : futuresMap.entrySet()) { - // skip sending the request for those futures who have already failed - if (entry.getValue().isCompletedExceptionally()) - continue; - + final ListConsumerGroupsResults results = + new ListConsumerGroupsResults(metadataExceptions, leaders, all); + for (final Node node : leaders) { final long nowList = time.milliseconds(); - - final int brokerId = entry.getKey().id(); - final KafkaFutureImpl> future = entry.getValue(); - - runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) { - + runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { return new ListGroupsRequest.Builder(); @@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse abstractResponse) { final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; - - if (response.error() != Errors.NONE) { - future.completeExceptionally(response.error().exception()); - } else { - final List groupsListing = new ArrayList<>(); - for (ListGroupsResponse.Group group : response.groups()) { - if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) { - final String groupId = group.groupId(); - final String protocolType = group.protocolType(); - final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty()); - groupsListing.add(groupListing); + synchronized (results) { + if (response.error() != Errors.NONE) { + results.addError(response.error().exception(), node); + } else { + for (ListGroupsResponse.Group group : response.groups()) { + if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || + group.protocolType().isEmpty()) { + final String groupId = group.groupId(); + final String protocolType = group.protocolType(); + final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty()); + results.addListing(groupListing); + } } } - future.complete(groupsListing); + results.tryComplete(node); } } @Override void handleFailure(Throwable throwable) { - future.completeExceptionally(throwable); + synchronized (results) { + results.addError(throwable, node); + results.tryComplete(node); + } } }, nowList); - } } @Override void handleFailure(Throwable throwable) { - listFuture.completeExceptionally(throwable); + all.complete(Collections.singletonList(throwable)); } }, nowMetadata); - return new ListConsumerGroupsResult(listFuture, flattenFuture, futuresMap); + return new ListConsumerGroupsResult(all); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java index c3f1236eb09..0ac852945f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java @@ -18,14 +18,11 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.Node; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.utils.AbstractIterator; +import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; -import java.util.Map; /** * The result of the {@link AdminClient#listConsumerGroups()} call. @@ -34,70 +31,72 @@ import java.util.Map; */ @InterfaceStability.Evolving public class ListConsumerGroupsResult { - private final Map>> futuresMap; - private final KafkaFuture> flattenFuture; - private final KafkaFuture listFuture; + private final KafkaFutureImpl> all; + private final KafkaFutureImpl> valid; + private final KafkaFutureImpl> errors; - ListConsumerGroupsResult(final KafkaFuture listFuture, - final KafkaFuture> flattenFuture, - final Map>> futuresMap) { - this.flattenFuture = flattenFuture; - this.listFuture = listFuture; - this.futuresMap = futuresMap; - } - - private class FutureConsumerGroupListingIterator extends AbstractIterator> { - private Iterator>> futuresIter; - private Iterator innerIter; - - @Override - protected KafkaFuture makeNext() { - if (futuresIter == null) { - try { - listFuture.get(); - } catch (Exception e) { - // the list future has failed, there will be no listings to show at all - return allDone(); - } - - futuresIter = futuresMap.values().iterator(); - } - - while (innerIter == null || !innerIter.hasNext()) { - if (futuresIter.hasNext()) { - KafkaFuture> collectionFuture = futuresIter.next(); - try { - Collection collection = collectionFuture.get(); - innerIter = collection.iterator(); - } catch (Exception e) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(e); - return future; + ListConsumerGroupsResult(KafkaFutureImpl> future) { + this.all = new KafkaFutureImpl<>(); + this.valid = new KafkaFutureImpl<>(); + this.errors = new KafkaFutureImpl<>(); + future.thenApply(new KafkaFuture.BaseFunction, Void>() { + @Override + public Void apply(Collection results) { + ArrayList curErrors = new ArrayList<>(); + ArrayList curValid = new ArrayList<>(); + for (Object resultObject : results) { + if (resultObject instanceof Throwable) { + curErrors.add((Throwable) resultObject); + } else { + curValid.add((ConsumerGroupListing) resultObject); } + } + if (!curErrors.isEmpty()) { + all.completeExceptionally(curErrors.get(0)); } else { - return allDone(); + all.complete(curValid); } + valid.complete(curValid); + errors.complete(curErrors); + return null; } + }); + } - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.complete(innerIter.next()); - return future; - } + /** + * Returns a future that yields either an exception, or the full set of consumer group + * listings. + * + * In the event of a failure, the future yields nothing but the first exception which + * occurred. + */ + public KafkaFutureImpl> all() { + return all; } /** - * Return an iterator of futures for ConsumerGroupListing objects; the returned future will throw exception - * if we cannot get a complete collection of consumer listings. + * Returns a future which yields just the valid listings. + * + * This future never fails with an error, no matter what happens. Errors are completely + * ignored. If nothing can be fetched, an empty collection is yielded. + * If there is an error, but some results can be returned, this future will yield + * those partial results. When using this future, it is a good idea to also check + * the errors future so that errors can be displayed and handled. */ - public Iterator> iterator() { - return new FutureConsumerGroupListingIterator(); + public KafkaFutureImpl> valid() { + return valid; } /** - * Return a future which yields a full collection of ConsumerGroupListing objects; will throw exception - * if we cannot get a complete collection of consumer listings. + * Returns a future which yields just the errors which occurred. + * + * If this future yields a non-empty collection, it is very likely that elements are + * missing from the valid() set. + * + * This future itself never fails with an error. In the event of an error, this future + * will successfully yield a collection containing at least one exception. */ - public KafkaFuture> all() { - return flattenFuture; + public KafkaFutureImpl> errors() { + return errors; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d2789b62621..0debed3dead 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; @@ -80,7 +81,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -648,9 +648,8 @@ public class KafkaAdminClientTest { } } - //Ignoring test to be fixed on follow-up PR @Test - public void testListConsumerGroups() { + public void testListConsumerGroups() throws Exception { final HashMap nodes = new HashMap<>(); Node node0 = new Node(0, "localhost", 8121); Node node1 = new Node(1, "localhost", 8122); @@ -685,7 +684,8 @@ public class KafkaAdminClientTest { env.cluster().nodes(), env.cluster().clusterResource().clusterId(), env.cluster().controller().id(), - Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata)))); + Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, + Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata)))); env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( @@ -713,31 +713,29 @@ public class KafkaAdminClientTest { node2); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - - try { - Collection listing = result.all().get(); - fail("Expected to throw exception"); - } catch (Exception e) { - // this is good - } - - Iterator> iterator = result.iterator(); - int numListing = 0; - int numFailure = 0; - - while (iterator.hasNext()) { - KafkaFuture future = iterator.next(); - try { - ConsumerGroupListing listing = future.get(); - numListing++; - assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2")); - } catch (Exception e) { - numFailure++; - } + assertFutureError(result.all(), CoordinatorNotAvailableException.class); + Collection listings = result.valid().get(); + assertEquals(2, listings.size()); + for (ConsumerGroupListing listing : listings) { + assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2")); } + assertEquals(1, result.errors().get().size()); - assertEquals(2, numListing); - assertEquals(1, numFailure); + // Test handling the error where we are unable to get metadata for the __consumer_offsets topic. + env.kafkaClient().prepareResponse( + new MetadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + Collections.singletonList(new MetadataResponse.TopicMetadata( + Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME, + true, Collections.emptyList())))); + final ListConsumerGroupsResult result2 = env.adminClient().listConsumerGroups(); + Collection errors = result2.errors().get(); + assertEquals(1, errors.size()); + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forException(errors.iterator().next())); + assertTrue(result2.valid().get().isEmpty()); + assertFutureError(result2.all(), UnknownTopicOrPartitionException.class); } }