diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index fd52cb6e5a8..dd412ab093f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -313,7 +313,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { if (!shouldRetry) { HashMap> topicsPartitionInfos = new HashMap<>(); for (String topic : cluster.topics()) - topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic)); + topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic)); return topicsPartitionInfos; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4169550ef11..f97c266d815 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1388,6 +1389,51 @@ public class FetcherTest { assertTrue(topicMetadata.containsKey(topicName)); } + @Test + public void testGetTopicMetadataOfflinePartitions() { + MetadataResponse originalResponse = newMetadataResponse(topicName, Errors.NONE); //baseline ok response + + //create a response based on the above one with all partitions being leaderless + List altTopics = new ArrayList<>(); + for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) { + List partitions = item.partitionMetadata(); + List altPartitions = new ArrayList<>(); + for (MetadataResponse.PartitionMetadata p : partitions) { + altPartitions.add(new MetadataResponse.PartitionMetadata( + p.error(), + p.partition(), + null, //no leader + p.replicas(), + p.isr(), + p.offlineReplicas()) + ); + } + MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata( + item.error(), + item.topic(), + item.isInternal(), + altPartitions + ); + altTopics.add(alteredTopic); + } + Node controller = originalResponse.controller(); + MetadataResponse altered = new MetadataResponse( + (List) originalResponse.brokers(), + originalResponse.clusterId(), + controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID, + altTopics); + + client.prepareResponse(altered); + + Map> topicMetadata = + fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L); + + Assert.assertNotNull(topicMetadata); + Assert.assertNotNull(topicMetadata.get(topicName)); + //noinspection ConstantConditions + Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size()); + } + /* * Send multiple requests. Verify that the client side quota metrics have the right values */