diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a162ddbf12a..18a9e78215a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -548,6 +548,8 @@ public class Fetcher implements Closeable { remainingToSearch.keySet().retainAll(value.partitionsToRetry); } else if (!future.isRetriable()) { throw future.exception(); + } else { + metadata.requestUpdate(); } if (metadata.updateRequested()) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 6440c42db53..2e45935a6a8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -2475,6 +2475,7 @@ public class FetcherTest { public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() { buildFetcher(); + subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); final String anotherTopic = "another-topic"; final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0); @@ -2518,6 +2519,38 @@ public class FetcherTest { assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset()); } + @Test + public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() { + buildFetcher(); + final String anotherTopic = "another-topic"; + final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0); + subscriptions.assignFromUser(Utils.mkSet(tp0, t2p0)); + + client.reset(); + + MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); + client.updateMetadata(initialMetadata); + + Map partitionNumByTopic = new HashMap<>(); + partitionNumByTopic.put(topicName, 1); + partitionNumByTopic.put(anotherTopic, 1); + MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(1, partitionNumByTopic); + client.prepareMetadataUpdate(updatedMetadata); + + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), + listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), true); + client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), metadata.fetch().leaderFor(tp0)); + + Map timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP); + Map offsetAndTimestampMap = fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); + + assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + tp0, + offsetAndTimestampMap.get(tp0)); + assertEquals(11L, offsetAndTimestampMap.get(tp0).offset()); + Assert.assertNotNull(metadata.fetch().partitionCountForTopic(anotherTopic)); + } + @Test(expected = TimeoutException.class) public void testBatchedListOffsetsMetadataErrors() { buildFetcher();