Browse Source

KAFKA-6648; Fetcher.getTopicMetadata() should return all partitions for each requested topic

Currently Fetcher.getTopicMetadata() will not include offline partitions. Thus
KafkaConsumer.partitionsFor(topic) will not return all partitions of a topic if
there if any partition of the topic is offline. This causes problem if user
tries to query the total number of partitions of the given topic.

Author: radai-rosenblatt <radai.rosenblatt@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4679 from radai-rosenblatt/partition_shenanigans
pull/5457/head
radai-rosenblatt 6 years ago committed by Dong Lin
parent
commit
09fe51f3eb
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  2. 46
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -313,7 +313,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -313,7 +313,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (!shouldRetry) {
HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
for (String topic : cluster.topics())
topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
return topicsPartitionInfos;
}
}

46
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive; @@ -84,6 +84,7 @@ import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -1388,6 +1389,51 @@ public class FetcherTest { @@ -1388,6 +1389,51 @@ public class FetcherTest {
assertTrue(topicMetadata.containsKey(topicName));
}
@Test
public void testGetTopicMetadataOfflinePartitions() {
MetadataResponse originalResponse = newMetadataResponse(topicName, Errors.NONE); //baseline ok response
//create a response based on the above one with all partitions being leaderless
List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>();
for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) {
List<MetadataResponse.PartitionMetadata> partitions = item.partitionMetadata();
List<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<>();
for (MetadataResponse.PartitionMetadata p : partitions) {
altPartitions.add(new MetadataResponse.PartitionMetadata(
p.error(),
p.partition(),
null, //no leader
p.replicas(),
p.isr(),
p.offlineReplicas())
);
}
MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(
item.error(),
item.topic(),
item.isInternal(),
altPartitions
);
altTopics.add(alteredTopic);
}
Node controller = originalResponse.controller();
MetadataResponse altered = new MetadataResponse(
(List<Node>) originalResponse.brokers(),
originalResponse.clusterId(),
controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
altTopics);
client.prepareResponse(altered);
Map<String, List<PartitionInfo>> topicMetadata =
fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L);
Assert.assertNotNull(topicMetadata);
Assert.assertNotNull(topicMetadata.get(topicName));
//noinspection ConstantConditions
Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
}
/*
* Send multiple requests. Verify that the client side quota metrics have the right values
*/

Loading…
Cancel
Save