Browse Source

KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.

Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh instead.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #4478 from becketqin/KAFKA-6849
pull/4483/merge
Jiangjie Qin 7 years ago
parent
commit
7804ea173b
  1. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  2. 18
      clients/src/test/java/org/apache/kafka/clients/MockClient.java
  3. 53
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

5
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -604,11 +604,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -604,11 +604,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
final Map<TopicPartition, Long> timestampsToSearch) {
// Group the partitions by node.
final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
// Add the topics to the metadata to do a single metadata fetch.
for (TopicPartition tp : timestampsToSearch.keySet())
metadata.add(tp.topic());
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionInfo info = metadata.fetch().partition(tp);
if (info == null) {
metadata.add(tp.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
return RequestFuture.staleMetadata();
} else if (info.leader() == null) {

18
clients/src/test/java/org/apache/kafka/clients/MockClient.java

@ -198,6 +198,12 @@ public class MockClient implements KafkaClient { @@ -198,6 +198,12 @@ public class MockClient implements KafkaClient {
if (metadataUpdate == null)
metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
else {
if (metadataUpdate.expectMatchRefreshTopics
&& !metadata.topics().equals(metadataUpdate.cluster.topics())) {
throw new IllegalStateException("The metadata topics does not match expectation. "
+ "Expected topics: " + metadataUpdate.cluster.topics()
+ ", asked topics: " + metadata.topics());
}
this.unavailableTopics = metadataUpdate.unavailableTopics;
metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
}
@ -344,7 +350,13 @@ public class MockClient implements KafkaClient { @@ -344,7 +350,13 @@ public class MockClient implements KafkaClient {
}
public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
}
public void prepareMetadataUpdate(Cluster cluster,
Set<String> unavailableTopics,
boolean expectMatchMetadataTopics) {
metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
}
public void setNode(Node node) {
@ -433,9 +445,11 @@ public class MockClient implements KafkaClient { @@ -433,9 +445,11 @@ public class MockClient implements KafkaClient {
private static class MetadataUpdate {
final Cluster cluster;
final Set<String> unavailableTopics;
MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
final boolean expectMatchRefreshTopics;
MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
this.cluster = cluster;
this.unavailableTopics = unavailableTopics;
this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
}
}

53
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@ -1942,41 +1943,51 @@ public class FetcherTest { @@ -1942,41 +1943,51 @@ public class FetcherTest {
return 1;
}
private void testGetOffsetsForTimesWithError(Errors errorForTp0,
Errors errorForTp1,
long offsetForTp0,
long offsetForTp1,
Long expectedOffsetForTp0,
Long expectedOffsetForTp1) {
private void testGetOffsetsForTimesWithError(Errors errorForP0,
Errors errorForP1,
long offsetForP0,
long offsetForP1,
Long expectedOffsetForP0,
Long expectedOffsetForP1) {
client.reset();
// Ensure metadata has both partition.
Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
String topicName2 = "topic2";
TopicPartition t2p0 = new TopicPartition(topicName2, 0);
// Expect a metadata refresh.
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
Collections.<String>emptySet(),
time.milliseconds());
Map<String, Integer> partitionNumByTopic = new HashMap<>();
partitionNumByTopic.put(topicName, 2);
partitionNumByTopic.put(topicName2, 1);
cluster = TestUtils.clusterWith(2, partitionNumByTopic);
// The metadata refresh should contain all the topics.
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
// First try should fail due to metadata error.
client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
// Second try should succeed.
client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, 0L);
timestampToSearch.put(t2p0, 0L);
timestampToSearch.put(tp1, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
if (expectedOffsetForTp0 == null)
assertNull(offsetAndTimestampMap.get(tp0));
if (expectedOffsetForP0 == null)
assertNull(offsetAndTimestampMap.get(t2p0));
else {
assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp());
assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).timestamp());
assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).offset());
}
if (expectedOffsetForTp1 == null)
if (expectedOffsetForP1 == null)
assertNull(offsetAndTimestampMap.get(tp1));
else {
assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).offset());
}
}

Loading…
Cancel
Save