From 863b534f83aad50528828a30857e1ff56ac93f27 Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Sun, 16 Feb 2020 12:06:33 -0800 Subject: [PATCH] KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088) Today if we attempt to list offsets with a fenced leader epoch, consumer will retry without updating the metadata until the timeout is reached. This affects synchronous APIs such as `offsetsForTimes`, `beginningOffsets`, and `endOffsets`. The fix in this patch is to trigger the metadata update call whenever we see a retriable error before additional attempts. Reviewers: Jason Gustafson --- .../clients/consumer/internals/Fetcher.java | 125 ++++++++------- .../common/requests/ListOffsetRequest.java | 14 ++ .../consumer/internals/FetcherTest.java | 146 +++++++++++++----- 3 files changed, 181 insertions(+), 104 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f0aaa1346d4..8e474fa46db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -536,26 +536,21 @@ public class Fetcher implements Closeable { RequestFuture future = sendListOffsetsRequests(remainingToSearch, requireTimestamps); client.poll(future, timer); - if (!future.isDone()) + if (!future.isDone()) { break; - - if (future.succeeded()) { + } else if (future.succeeded()) { ListOffsetResult value = future.value(); result.fetchedOffsets.putAll(value.fetchedOffsets); - if (value.partitionsToRetry.isEmpty()) - return result; - remainingToSearch.keySet().retainAll(value.partitionsToRetry); } else if (!future.isRetriable()) { throw future.exception(); - } else { - metadata.requestUpdate(); } - if (metadata.updateRequested()) + if (remainingToSearch.isEmpty()) { + return result; + } else { client.awaitMetadataUpdate(timer); - else - timer.sleep(retryBackoffMs); + } } while (timer.notExpired()); throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms"); @@ -979,63 +974,66 @@ public class Fetcher implements Closeable { TopicPartition topicPartition = entry.getKey(); ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition); Errors error = partitionData.error; - if (error == Errors.NONE) { - if (partitionData.offsets != null) { - // Handle v0 response - long offset; - if (partitionData.offsets.size() > 1) { - future.raise(new IllegalStateException("Unexpected partitionData response of length " + - partitionData.offsets.size())); - return; - } else if (partitionData.offsets.isEmpty()) { - offset = ListOffsetResponse.UNKNOWN_OFFSET; - } else { - offset = partitionData.offsets.get(0); - } - log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", + switch (error) { + case NONE: + if (partitionData.offsets != null) { + // Handle v0 response + long offset; + if (partitionData.offsets.size() > 1) { + future.raise(new IllegalStateException("Unexpected partitionData response of length " + + partitionData.offsets.size())); + return; + } else if (partitionData.offsets.isEmpty()) { + offset = ListOffsetResponse.UNKNOWN_OFFSET; + } else { + offset = partitionData.offsets.get(0); + } + log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", topicPartition, offset); - if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { - ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty()); - fetchedOffsets.put(topicPartition, offsetData); - } - } else { - // Handle v1 and later response - log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", + if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { + ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty()); + fetchedOffsets.put(topicPartition, offsetData); + } + } else { + // Handle v1 and later response + log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", topicPartition, partitionData.offset, partitionData.timestamp); - if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { - ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp, + if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { + ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch); - fetchedOffsets.put(topicPartition, offsetData); + fetchedOffsets.put(topicPartition, offsetData); + } } - } - } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) { - // The message format on the broker side is before 0.10.0, which means it does not - // support timestamps. We treat this case the same as if we weren't able to find an - // offset corresponding to the requested timestamp and leave it out of the result. - log.debug("Cannot search by timestamp for partition {} because the message format version " + - "is before 0.10.0", topicPartition); - } else if (error == Errors.NOT_LEADER_FOR_PARTITION || - error == Errors.REPLICA_NOT_AVAILABLE || - error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.OFFSET_NOT_AVAILABLE || - error == Errors.LEADER_NOT_AVAILABLE || - error == Errors.UNKNOWN_LEADER_EPOCH) { - log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", + break; + case UNSUPPORTED_FOR_MESSAGE_FORMAT: + // The message format on the broker side is before 0.10.0, which means it does not + // support timestamps. We treat this case the same as if we weren't able to find an + // offset corresponding to the requested timestamp and leave it out of the result. + log.debug("Cannot search by timestamp for partition {} because the message format version " + + "is before 0.10.0", topicPartition); + break; + case NOT_LEADER_FOR_PARTITION: + case REPLICA_NOT_AVAILABLE: + case KAFKA_STORAGE_ERROR: + case OFFSET_NOT_AVAILABLE: + case LEADER_NOT_AVAILABLE: + case FENCED_LEADER_EPOCH: + case UNKNOWN_LEADER_EPOCH: + log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", topicPartition, error); - partitionsToRetry.add(topicPartition); - } else if (error == Errors.FENCED_LEADER_EPOCH) { - log.debug("Attempt to fetch offsets for partition {} failed due to fenced leader epoch, refresh " + - "the metadata and retrying.", topicPartition); - metadata.requestUpdate(); - partitionsToRetry.add(topicPartition); - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition); - partitionsToRetry.add(topicPartition); - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - unauthorizedTopics.add(topicPartition.topic()); - } else { - log.warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message()); - partitionsToRetry.add(topicPartition); + partitionsToRetry.add(topicPartition); + break; + case UNKNOWN_TOPIC_OR_PARTITION: + log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition); + partitionsToRetry.add(topicPartition); + break; + case TOPIC_AUTHORIZATION_FAILED: + unauthorizedTopics.add(topicPartition.topic()); + break; + default: + log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", + topicPartition, error.message()); + partitionsToRetry.add(topicPartition); } } @@ -1239,7 +1237,6 @@ public class Fetcher implements Closeable { }); } - nextCompletedFetch.initialized = true; } else if (error == Errors.NOT_LEADER_FOR_PARTITION || error == Errors.REPLICA_NOT_AVAILABLE || diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 17f921f6f32..1b6ad127d6e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -204,6 +205,19 @@ public class ListOffsetRequest extends AbstractRequest { this(timestamp, 1, currentLeaderEpoch); } + @Override + public boolean equals(Object obj) { + if (!(obj instanceof PartitionData)) return false; + PartitionData other = (PartitionData) obj; + return this.timestamp == other.timestamp && + this.currentLeaderEpoch.equals(other.currentLeaderEpoch); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, currentLeaderEpoch); + } + @Override public String toString() { StringBuilder bld = new StringBuilder(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d0eba44bede..b757e885852 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -66,7 +66,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.requests.FetchRequest; @@ -406,13 +405,10 @@ public class FetcherTest { } private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) { - return new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - FetchRequest fetch = (FetchRequest) body; - return fetch.fetchData().containsKey(tp) && - fetch.fetchData().get(tp).fetchOffset == offset; - } + return body -> { + FetchRequest fetch = (FetchRequest) body; + return fetch.fetchData().containsKey(tp) && + fetch.fetchData().get(tp).fetchOffset == offset; }; } @@ -2371,13 +2367,9 @@ public class FetcherTest { @Test public void testGetOffsetsForTimesTimeout() { - try { - buildFetcher(); - fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L)); - fail("Should throw timeout exception."); - } catch (TimeoutException e) { - // let it go. - } + buildFetcher(); + assertThrows(TimeoutException.class, () -> fetcher.offsetsForTimes( + Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L))); } @Test @@ -2385,7 +2377,7 @@ public class FetcherTest { buildFetcher(); // Empty map - assertTrue(fetcher.offsetsForTimes(new HashMap(), time.timer(100L)).isEmpty()); + assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty()); // Unknown Offset testGetOffsetsForTimesWithUnknownOffset(); // Error code none with unknown offset @@ -2421,6 +2413,89 @@ public class FetcherTest { assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); } + @Test + public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { + List retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION, + Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE, + Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH); + + final int newLeaderEpoch = 3; + MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith("dummy", 3, + singletonMap(topicName, Errors.NONE), singletonMap(topicName, 4), tp -> newLeaderEpoch); + + Node originalLeader = initialUpdateResponse.cluster().leaderFor(tp1); + Node newLeader = updatedMetadata.cluster().leaderFor(tp1); + assertNotEquals(originalLeader, newLeader); + + for (Errors retriableError : retriableErrors) { + buildFetcher(); + + subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); + client.updateMetadata(initialUpdateResponse); + + final long fetchTimestamp = 10L; + Map allPartitionData = new HashMap<>(); + allPartitionData.put(tp0, new ListOffsetResponse.PartitionData( + Errors.NONE, fetchTimestamp, 4L, Optional.empty())); + allPartitionData.put(tp1, new ListOffsetResponse.PartitionData( + retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); + + client.prepareResponseFrom(body -> { + boolean isListOffsetRequest = body instanceof ListOffsetRequest; + if (isListOffsetRequest) { + ListOffsetRequest request = (ListOffsetRequest) body; + Map expectedTopicPartitions = new HashMap<>(); + expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData( + fetchTimestamp, Optional.empty())); + expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData( + fetchTimestamp, Optional.empty())); + + return request.partitionTimestamps().equals(expectedTopicPartitions); + } else { + return false; + } + }, new ListOffsetResponse(allPartitionData), originalLeader); + + client.prepareMetadataUpdate(updatedMetadata); + + // If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception. + // We will count the answered future response in the end to verify if this is the case. + Map paritionDataWithFatalError = new HashMap<>(allPartitionData); + paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData( + Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); + client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader); + + // The request to new leader must only contain one partition tp1 with error. + client.prepareResponseFrom(body -> { + boolean isListOffsetRequest = body instanceof ListOffsetRequest; + if (isListOffsetRequest) { + ListOffsetRequest request = (ListOffsetRequest) body; + + return request.partitionTimestamps().equals( + Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData( + fetchTimestamp, Optional.of(newLeaderEpoch)))); + } else { + return false; + } + }, listOffsetResponse(tp1, Errors.NONE, fetchTimestamp, 5L), newLeader); + + Map offsetAndTimestampMap = + fetcher.offsetsForTimes( + Utils.mkMap(Utils.mkEntry(tp0, fetchTimestamp), + Utils.mkEntry(tp1, fetchTimestamp)), time.timer(Integer.MAX_VALUE)); + + assertEquals(Utils.mkMap( + Utils.mkEntry(tp0, new OffsetAndTimestamp(4L, fetchTimestamp)), + Utils.mkEntry(tp1, new OffsetAndTimestamp(5L, fetchTimestamp))), offsetAndTimestampMap); + + // The NOT_LEADER exception future should not be cleared as we already refreshed the metadata before + // first retry, thus never hitting. + assertEquals(1, client.numAwaitingResponses()); + + fetcher.close(); + } + } + @Test public void testGetOffsetsUnknownLeaderEpoch() { buildFetcher(); @@ -2616,7 +2691,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); - currentOffset += commitTransaction(buffer, 1L, currentOffset); + commitTransaction(buffer, 1L, currentOffset); buffer.flip(); List abortedTransactions = new ArrayList<>(); @@ -2628,13 +2703,10 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - FetchRequest request = (FetchRequest) body; - assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); - return true; - } + client.prepareResponse(body -> { + FetchRequest request = (FetchRequest) body; + assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); + return true; }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(time.timer(0)); @@ -2766,7 +2838,7 @@ public class FetcherTest { for (ConsumerRecord consumerRecord : fetchedConsumerRecords) { actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8)); } - assertTrue(actuallyCommittedKeys.equals(committedKeys)); + assertEquals(actuallyCommittedKeys, committedKeys); } @Test @@ -3304,13 +3376,10 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - FetchRequest request = (FetchRequest) body; - assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); - return true; - } + client.prepareResponse(body -> { + FetchRequest request = (FetchRequest) body; + assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); + return true; }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(time.timer(0)); @@ -3344,12 +3413,11 @@ public class FetcherTest { return appendTransactionalRecords(buffer, pid, baseOffset, (int) baseOffset, records); } - private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) { + private void commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) { short producerEpoch = 0; int partitionLeaderEpoch = 0; MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)); - return 1; } private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) { @@ -3843,12 +3911,10 @@ public class FetcherTest { private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp - return new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - ListOffsetRequest req = (ListOffsetRequest) body; - return timestamp == req.partitionTimestamps().get(tp0).timestamp; - } + return body -> { + ListOffsetRequest req = (ListOffsetRequest) body; + return req.partitionTimestamps().equals(Collections.singletonMap( + tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.empty()))); }; }