Browse Source

KAFKA-9159: The caller of sendListOffsetRequest need to handle retriable InvalidMetadata (#7665)

Loop call Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in 30000ms after a leader change.

Reviewers: Steven Lu, Guozhang Wang <wangguoz@gmail.com>
pull/7953/head
zzccctv 5 years ago committed by Guozhang Wang
parent
commit
39278d3089
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  2. 33
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -548,6 +548,8 @@ public class Fetcher<K, V> implements Closeable { @@ -548,6 +548,8 @@ public class Fetcher<K, V> implements Closeable {
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
} else if (!future.isRetriable()) {
throw future.exception();
} else {
metadata.requestUpdate();
}
if (metadata.updateRequested())

33
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -2475,6 +2475,7 @@ public class FetcherTest { @@ -2475,6 +2475,7 @@ public class FetcherTest {
public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
buildFetcher();
subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
final String anotherTopic = "another-topic";
final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
@ -2518,6 +2519,38 @@ public class FetcherTest { @@ -2518,6 +2519,38 @@ public class FetcherTest {
assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset());
}
@Test
public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() {
buildFetcher();
final String anotherTopic = "another-topic";
final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
subscriptions.assignFromUser(Utils.mkSet(tp0, t2p0));
client.reset();
MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
client.updateMetadata(initialMetadata);
Map<String, Integer> partitionNumByTopic = new HashMap<>();
partitionNumByTopic.put(topicName, 1);
partitionNumByTopic.put(anotherTopic, 1);
MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(1, partitionNumByTopic);
client.prepareMetadataUpdate(updatedMetadata);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), true);
client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), metadata.fetch().leaderFor(tp0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE));
assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + tp0,
offsetAndTimestampMap.get(tp0));
assertEquals(11L, offsetAndTimestampMap.get(tp0).offset());
Assert.assertNotNull(metadata.fetch().partitionCountForTopic(anotherTopic));
}
@Test(expected = TimeoutException.class)
public void testBatchedListOffsetsMetadataErrors() {
buildFetcher();

Loading…
Cancel
Save