Browse Source

KAFKA-9535; Update metadata before retrying partitions when fetching offsets (#8088)

Today if we attempt to list offsets with a fenced leader epoch, consumer will retry without updating the metadata until the timeout is reached. This affects synchronous APIs such as `offsetsForTimes`, `beginningOffsets`, and `endOffsets`. The fix in this patch is to trigger the metadata update call whenever we see a retriable error before additional attempts.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/8128/head
Boyang Chen 5 years ago committed by GitHub
parent
commit
863b534f83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 125
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  2. 14
      clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
  3. 146
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

125
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -536,26 +536,21 @@ public class Fetcher<K, V> implements Closeable { @@ -536,26 +536,21 @@ public class Fetcher<K, V> implements Closeable {
RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps);
client.poll(future, timer);
if (!future.isDone())
if (!future.isDone()) {
break;
if (future.succeeded()) {
} else if (future.succeeded()) {
ListOffsetResult value = future.value();
result.fetchedOffsets.putAll(value.fetchedOffsets);
if (value.partitionsToRetry.isEmpty())
return result;
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
} else if (!future.isRetriable()) {
throw future.exception();
} else {
metadata.requestUpdate();
}
if (metadata.updateRequested())
if (remainingToSearch.isEmpty()) {
return result;
} else {
client.awaitMetadataUpdate(timer);
else
timer.sleep(retryBackoffMs);
}
} while (timer.notExpired());
throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
@ -979,63 +974,66 @@ public class Fetcher<K, V> implements Closeable { @@ -979,63 +974,66 @@ public class Fetcher<K, V> implements Closeable {
TopicPartition topicPartition = entry.getKey();
ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
Errors error = partitionData.error;
if (error == Errors.NONE) {
if (partitionData.offsets != null) {
// Handle v0 response
long offset;
if (partitionData.offsets.size() > 1) {
future.raise(new IllegalStateException("Unexpected partitionData response of length " +
partitionData.offsets.size()));
return;
} else if (partitionData.offsets.isEmpty()) {
offset = ListOffsetResponse.UNKNOWN_OFFSET;
} else {
offset = partitionData.offsets.get(0);
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
switch (error) {
case NONE:
if (partitionData.offsets != null) {
// Handle v0 response
long offset;
if (partitionData.offsets.size() > 1) {
future.raise(new IllegalStateException("Unexpected partitionData response of length " +
partitionData.offsets.size()));
return;
} else if (partitionData.offsets.isEmpty()) {
offset = ListOffsetResponse.UNKNOWN_OFFSET;
} else {
offset = partitionData.offsets.get(0);
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(partitionData.offset, partitionData.timestamp,
partitionData.leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
fetchedOffsets.put(topicPartition, offsetData);
}
}
}
} else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
// The message format on the broker side is before 0.10.0, which means it does not
// support timestamps. We treat this case the same as if we weren't able to find an
// offset corresponding to the requested timestamp and leave it out of the result.
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.OFFSET_NOT_AVAILABLE ||
error == Errors.LEADER_NOT_AVAILABLE ||
error == Errors.UNKNOWN_LEADER_EPOCH) {
log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
break;
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
// The message format on the broker side is before 0.10.0, which means it does not
// support timestamps. We treat this case the same as if we weren't able to find an
// offset corresponding to the requested timestamp and leave it out of the result.
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);
break;
case NOT_LEADER_FOR_PARTITION:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
partitionsToRetry.add(topicPartition);
} else if (error == Errors.FENCED_LEADER_EPOCH) {
log.debug("Attempt to fetch offsets for partition {} failed due to fenced leader epoch, refresh " +
"the metadata and retrying.", topicPartition);
metadata.requestUpdate();
partitionsToRetry.add(topicPartition);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
partitionsToRetry.add(topicPartition);
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(topicPartition.topic());
} else {
log.warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.", topicPartition, error.message());
partitionsToRetry.add(topicPartition);
partitionsToRetry.add(topicPartition);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
partitionsToRetry.add(topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
break;
default:
log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.",
topicPartition, error.message());
partitionsToRetry.add(topicPartition);
}
}
@ -1239,7 +1237,6 @@ public class Fetcher<K, V> implements Closeable { @@ -1239,7 +1237,6 @@ public class Fetcher<K, V> implements Closeable {
});
}
nextCompletedFetch.initialized = true;
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||

14
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java

@ -32,6 +32,7 @@ import java.util.HashMap; @@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -204,6 +205,19 @@ public class ListOffsetRequest extends AbstractRequest { @@ -204,6 +205,19 @@ public class ListOffsetRequest extends AbstractRequest {
this(timestamp, 1, currentLeaderEpoch);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof PartitionData)) return false;
PartitionData other = (PartitionData) obj;
return this.timestamp == other.timestamp &&
this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, currentLeaderEpoch);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();

146
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -66,7 +66,6 @@ import org.apache.kafka.common.record.RecordBatch; @@ -66,7 +66,6 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
@ -406,13 +405,10 @@ public class FetcherTest { @@ -406,13 +405,10 @@ public class FetcherTest {
}
private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
FetchRequest fetch = (FetchRequest) body;
return fetch.fetchData().containsKey(tp) &&
fetch.fetchData().get(tp).fetchOffset == offset;
}
return body -> {
FetchRequest fetch = (FetchRequest) body;
return fetch.fetchData().containsKey(tp) &&
fetch.fetchData().get(tp).fetchOffset == offset;
};
}
@ -2371,13 +2367,9 @@ public class FetcherTest { @@ -2371,13 +2367,9 @@ public class FetcherTest {
@Test
public void testGetOffsetsForTimesTimeout() {
try {
buildFetcher();
fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L));
fail("Should throw timeout exception.");
} catch (TimeoutException e) {
// let it go.
}
buildFetcher();
assertThrows(TimeoutException.class, () -> fetcher.offsetsForTimes(
Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), time.timer(100L)));
}
@Test
@ -2385,7 +2377,7 @@ public class FetcherTest { @@ -2385,7 +2377,7 @@ public class FetcherTest {
buildFetcher();
// Empty map
assertTrue(fetcher.offsetsForTimes(new HashMap<TopicPartition, Long>(), time.timer(100L)).isEmpty());
assertTrue(fetcher.offsetsForTimes(new HashMap<>(), time.timer(100L)).isEmpty());
// Unknown Offset
testGetOffsetsForTimesWithUnknownOffset();
// Error code none with unknown offset
@ -2421,6 +2413,89 @@ public class FetcherTest { @@ -2421,6 +2413,89 @@ public class FetcherTest {
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
}
@Test
public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION,
Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE,
Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
final int newLeaderEpoch = 3;
MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith("dummy", 3,
singletonMap(topicName, Errors.NONE), singletonMap(topicName, 4), tp -> newLeaderEpoch);
Node originalLeader = initialUpdateResponse.cluster().leaderFor(tp1);
Node newLeader = updatedMetadata.cluster().leaderFor(tp1);
assertNotEquals(originalLeader, newLeader);
for (Errors retriableError : retriableErrors) {
buildFetcher();
subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
client.updateMetadata(initialUpdateResponse);
final long fetchTimestamp = 10L;
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
client.prepareResponseFrom(body -> {
boolean isListOffsetRequest = body instanceof ListOffsetRequest;
if (isListOffsetRequest) {
ListOffsetRequest request = (ListOffsetRequest) body;
Map<TopicPartition, ListOffsetRequest.PartitionData> expectedTopicPartitions = new HashMap<>();
expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.empty()));
expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.empty()));
return request.partitionTimestamps().equals(expectedTopicPartitions);
} else {
return false;
}
}, new ListOffsetResponse(allPartitionData), originalLeader);
client.prepareMetadataUpdate(updatedMetadata);
// If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception.
// We will count the answered future response in the end to verify if this is the case.
Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
// The request to new leader must only contain one partition tp1 with error.
client.prepareResponseFrom(body -> {
boolean isListOffsetRequest = body instanceof ListOffsetRequest;
if (isListOffsetRequest) {
ListOffsetRequest request = (ListOffsetRequest) body;
return request.partitionTimestamps().equals(
Collections.singletonMap(tp1, new ListOffsetRequest.PartitionData(
fetchTimestamp, Optional.of(newLeaderEpoch))));
} else {
return false;
}
}, listOffsetResponse(tp1, Errors.NONE, fetchTimestamp, 5L), newLeader);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
fetcher.offsetsForTimes(
Utils.mkMap(Utils.mkEntry(tp0, fetchTimestamp),
Utils.mkEntry(tp1, fetchTimestamp)), time.timer(Integer.MAX_VALUE));
assertEquals(Utils.mkMap(
Utils.mkEntry(tp0, new OffsetAndTimestamp(4L, fetchTimestamp)),
Utils.mkEntry(tp1, new OffsetAndTimestamp(5L, fetchTimestamp))), offsetAndTimestampMap);
// The NOT_LEADER exception future should not be cleared as we already refreshed the metadata before
// first retry, thus never hitting.
assertEquals(1, client.numAwaitingResponses());
fetcher.close();
}
}
@Test
public void testGetOffsetsUnknownLeaderEpoch() {
buildFetcher();
@ -2616,7 +2691,7 @@ public class FetcherTest { @@ -2616,7 +2691,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
currentOffset += commitTransaction(buffer, 1L, currentOffset);
commitTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@ -2628,13 +2703,10 @@ public class FetcherTest { @@ -2628,13 +2703,10 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
FetchRequest request = (FetchRequest) body;
assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
return true;
}
client.prepareResponse(body -> {
FetchRequest request = (FetchRequest) body;
assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
return true;
}, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
consumerClient.poll(time.timer(0));
@ -2766,7 +2838,7 @@ public class FetcherTest { @@ -2766,7 +2838,7 @@ public class FetcherTest {
for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
}
assertTrue(actuallyCommittedKeys.equals(committedKeys));
assertEquals(actuallyCommittedKeys, committedKeys);
}
@Test
@ -3304,13 +3376,10 @@ public class FetcherTest { @@ -3304,13 +3376,10 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
FetchRequest request = (FetchRequest) body;
assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
return true;
}
client.prepareResponse(body -> {
FetchRequest request = (FetchRequest) body;
assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
return true;
}, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
consumerClient.poll(time.timer(0));
@ -3344,12 +3413,11 @@ public class FetcherTest { @@ -3344,12 +3413,11 @@ public class FetcherTest {
return appendTransactionalRecords(buffer, pid, baseOffset, (int) baseOffset, records);
}
private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
private void commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
short producerEpoch = 0;
int partitionLeaderEpoch = 0;
MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
return 1;
}
private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
@ -3843,12 +3911,10 @@ public class FetcherTest { @@ -3843,12 +3911,10 @@ public class FetcherTest {
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
// matches any list offset request with the provided timestamp
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
ListOffsetRequest req = (ListOffsetRequest) body;
return timestamp == req.partitionTimestamps().get(tp0).timestamp;
}
return body -> {
ListOffsetRequest req = (ListOffsetRequest) body;
return req.partitionTimestamps().equals(Collections.singletonMap(
tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.empty())));
};
}

Loading…
Cancel
Save