Browse Source

KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration (#14346)

Implementation of the required functionality for resetting and validating positions in the new async consumer.

This PR includes:
1. New async application events ResetPositionsApplicationEvent and ValidatePositionsApplicationEvent, both handled by the same OfffsetsRequestManager.
2. Integration of the reset/validate functionality in the new async consumer, to update fetch positions using the partitions offsets.
3. Minor refactoring to extract functionality that is reused from both consumer implementations (moving logic without changes from OffsetFetcher into OffsetFetchUtils, and from OffsetsForLeaderEpochClient into OffsetsForLeaderEpochUtils)

Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
pull/14388/head
Lianet Magrans 1 year ago committed by GitHub
parent
commit
a7e865c0a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
  2. 10
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
  3. 60
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
  4. 88
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
  5. 104
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
  6. 142
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
  7. 224
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
  8. 34
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
  9. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
  10. 14
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
  11. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
  12. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
  13. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
  14. 30
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
  15. 30
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
  16. 87
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
  17. 18
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java
  18. 315
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java

5
clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java

@ -33,6 +33,11 @@ public class LogTruncationException extends OffsetOutOfRangeException { @@ -33,6 +33,11 @@ public class LogTruncationException extends OffsetOutOfRangeException {
private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
public LogTruncationException(Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {
this("Truncated partitions detected with divergent offsets " + divergentOffsets, fetchOffsets, divergentOffsets);
}
public LogTruncationException(String message,
Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata> divergentOffsets) {

10
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

@ -48,9 +48,15 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configur @@ -48,9 +48,15 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configur
* Background thread runnable that consumes {@code ApplicationEvent} and
* produces {@code BackgroundEvent}. It uses an event loop to consume and
* produce events, and poll the network client to handle network IO.
* <p>
* <p/>
* It holds a reference to the {@link SubscriptionState}, which is
* initialized by the polling thread.
* <p/>
* For processing application events that have been submitted to the
* {@link #applicationEventQueue}, this relies on an {@link ApplicationEventProcessor}. Processing includes generating requests and
* handling responses with the appropriate {@link RequestManager}. The network operations for
* actually sending the requests is delegated to the {@link NetworkClientDelegate}
* </li>
*/
public class DefaultBackgroundThread extends KafkaThread {
private static final long MAX_POLL_TIMEOUT_MS = 5000;
@ -148,6 +154,7 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -148,6 +154,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.groupState = new GroupState(rebalanceConfig);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
OffsetsRequestManager offsetsRequestManager =
new OffsetsRequestManager(
@ -156,6 +163,7 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -156,6 +163,7 @@ public class DefaultBackgroundThread extends KafkaThread {
configuredIsolationLevel(config),
time,
retryBackoffMs,
requestTimeoutMs,
apiVersions,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;

60
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java

@ -21,12 +21,10 @@ import org.apache.kafka.clients.ClientResponse; @@ -21,12 +21,10 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.OffsetForEpochResult;
import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
@ -41,13 +39,10 @@ import org.apache.kafka.common.utils.Time; @@ -41,13 +39,10 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -70,7 +65,6 @@ public class OffsetFetcher { @@ -70,7 +65,6 @@ public class OffsetFetcher {
private final SubscriptionState subscriptions;
private final ConsumerNetworkClient client;
private final Time time;
private final long retryBackoffMs;
private final long requestTimeoutMs;
private final IsolationLevel isolationLevel;
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
@ -91,7 +85,6 @@ public class OffsetFetcher { @@ -91,7 +85,6 @@ public class OffsetFetcher {
this.client = client;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.isolationLevel = isolationLevel;
this.apiVersions = apiVersions;
@ -227,16 +220,12 @@ public class OffsetFetcher { @@ -227,16 +220,12 @@ public class OffsetFetcher {
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@Override
public void onSuccess(ListOffsetResult result) {
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(
resetTimestamps,
result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result);
}
@Override
public void onFailure(RuntimeException e) {
offsetFetcherUtils.onFailedRequestForResettingPositions(
resetTimestamps,
e);
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
}
});
}
@ -284,55 +273,18 @@ public class OffsetFetcher { @@ -284,55 +273,18 @@ public class OffsetFetcher {
future.addListener(new RequestFutureListener<OffsetForEpochResult>() {
@Override
public void onSuccess(OffsetForEpochResult offsetsResult) {
List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
if (!offsetsResult.partitionsToRetry().isEmpty()) {
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
}
// For each OffsetsForLeader response, check if the end-offset is lower than our current offset
// for the partition. If so, it means we have experienced log truncation and need to reposition
// that partition's offset.
//
// In addition, check whether the returned offset and epoch are valid. If not, then we should reset
// its offset if reset policy is configured, or throw out of range exception.
offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
FetchPosition requestPosition = fetchPositions.get(topicPartition);
Optional<SubscriptionState.LogTruncation> truncationOpt =
subscriptions.maybeCompleteValidation(topicPartition, requestPosition, respEndOffset);
truncationOpt.ifPresent(truncations::add);
});
if (!truncations.isEmpty()) {
offsetFetcherUtils.maybeSetOffsetForLeaderException(buildLogTruncationException(truncations));
}
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
offsetsResult);
}
@Override
public void onFailure(RuntimeException e) {
subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
if (!(e instanceof RetriableException)) {
offsetFetcherUtils.maybeSetOffsetForLeaderException(e);
}
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
}
});
});
}
private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<>();
Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
for (SubscriptionState.LogTruncation truncation : truncations) {
truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
divergentOffsets.put(truncation.topicPartition, divergentOffset));
truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset);
}
return new LogTruncationException("Detected truncated partitions: " + truncations,
truncatedFetchOffsets, divergentOffsets);
}
/**
* Search the offsets by target times for the specified partitions.
*

88
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java

@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,6 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
@ -37,9 +39,11 @@ import org.apache.kafka.common.utils.LogContext; @@ -37,9 +39,11 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -59,8 +63,19 @@ class OffsetFetcherUtils { @@ -59,8 +63,19 @@ class OffsetFetcherUtils {
private final ApiVersions apiVersions;
private final Logger log;
private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
/**
* Exception that occurred while validating positions, that will be propagated on the next
* call to validate positions. This could be an error received in the
* OffsetsForLeaderEpoch response, or a LogTruncationException detected when using a
* successful response to validate the positions. It will be cleared when thrown.
*/
private final AtomicReference<RuntimeException> cachedValidatePositionsException = new AtomicReference<>();
/**
* Exception that occurred while resetting positions, that will be propagated on the next
* call to reset positions. This will have the error received in the response to the
* ListOffsets request. It will be cleared when thrown on the next call to reset.
*/
private final AtomicReference<RuntimeException> cachedResetPositionsException = new AtomicReference<>();
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
OffsetFetcherUtils(LogContext logContext,
@ -171,7 +186,7 @@ class OffsetFetcherUtils { @@ -171,7 +186,7 @@ class OffsetFetcherUtils {
}
Map<TopicPartition, SubscriptionState.FetchPosition> getPartitionsToValidate() {
RuntimeException exception = cachedOffsetForLeaderException.getAndSet(null);
RuntimeException exception = cachedValidatePositionsException.getAndSet(null);
if (exception != null)
throw exception;
@ -187,9 +202,9 @@ class OffsetFetcherUtils { @@ -187,9 +202,9 @@ class OffsetFetcherUtils {
.collect(Collectors.toMap(Function.identity(), subscriptionState::position));
}
void maybeSetOffsetForLeaderException(RuntimeException e) {
if (!cachedOffsetForLeaderException.compareAndSet(null, e)) {
log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", e);
void maybeSetValidatePositionsException(RuntimeException e) {
if (!cachedValidatePositionsException.compareAndSet(null, e)) {
log.error("Discarding error validating positions because another error is pending", e);
}
}
@ -209,7 +224,7 @@ class OffsetFetcherUtils { @@ -209,7 +224,7 @@ class OffsetFetcherUtils {
Map<TopicPartition, Long> getOffsetResetTimestamp() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedListOffsetsException.getAndSet(null);
RuntimeException exception = cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;
@ -285,7 +300,7 @@ class OffsetFetcherUtils { @@ -285,7 +300,7 @@ class OffsetFetcherUtils {
return null;
}
void onSuccessfulRequestForResettingPositions(
void onSuccessfulResponseForResettingPositions(
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final ListOffsetResult result) {
if (!result.partitionsToRetry.isEmpty()) {
@ -304,15 +319,66 @@ class OffsetFetcherUtils { @@ -304,15 +319,66 @@ class OffsetFetcherUtils {
}
}
void onFailedRequestForResettingPositions(
void onFailedResponseForResettingPositions(
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final RuntimeException error) {
subscriptionState.requestFailed(resetTimestamps.keySet(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
if (!(error instanceof RetriableException) && !cachedListOffsetsException.compareAndSet(null,
if (!(error instanceof RetriableException) && !cachedResetPositionsException.compareAndSet(null,
error))
log.error("Discarding error in ListOffsetResponse because another error is pending", error);
log.error("Discarding error resetting positions because another error is pending",
error);
}
void onSuccessfulResponseForValidatingPositions(
final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions,
final OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetsResult) {
List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
if (!offsetsResult.partitionsToRetry().isEmpty()) {
subscriptionState.setNextAllowedRetry(offsetsResult.partitionsToRetry(),
time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
}
// For each OffsetsForLeader response, check if the end-offset is lower than our current offset
// for the partition. If so, it means we have experienced log truncation and need to reposition
// that partition's offset.
// In addition, check whether the returned offset and epoch are valid. If not, then we should reset
// its offset if reset policy is configured, or throw out of range exception.
offsetsResult.endOffsets().forEach((topicPartition, respEndOffset) -> {
SubscriptionState.FetchPosition requestPosition = fetchPositions.get(topicPartition);
Optional<SubscriptionState.LogTruncation> truncationOpt =
subscriptionState.maybeCompleteValidation(topicPartition, requestPosition,
respEndOffset);
truncationOpt.ifPresent(truncations::add);
});
if (!truncations.isEmpty()) {
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
}
}
void onFailedResponseForValidatingPositions(final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions,
final RuntimeException error) {
subscriptionState.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
if (!(error instanceof RetriableException)) {
maybeSetValidatePositionsException(error);
}
}
private LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> truncations) {
Map<TopicPartition, OffsetAndMetadata> divergentOffsets = new HashMap<>();
Map<TopicPartition, Long> truncatedFetchOffsets = new HashMap<>();
for (SubscriptionState.LogTruncation truncation : truncations) {
truncation.divergentOffsetOpt.ifPresent(divergentOffset ->
divergentOffsets.put(truncation.topicPartition, divergentOffset));
truncatedFetchOffsets.put(truncation.topicPartition, truncation.fetchPosition.offset);
}
return new LogTruncationException(truncatedFetchOffsets, divergentOffsets);
}
// Visible for testing

104
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java

@ -18,23 +18,12 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,23 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.utils.LogContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Convenience class for making asynchronous requests to the OffsetsForLeaderEpoch API
@ -43,7 +32,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient< @@ -43,7 +32,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
Map<TopicPartition, SubscriptionState.FetchPosition>,
OffsetsForLeaderEpochRequest,
OffsetsForLeaderEpochResponse,
OffsetsForLeaderEpochClient.OffsetForEpochResult> {
OffsetsForLeaderEpochUtils.OffsetForEpochResult> {
OffsetsForLeaderEpochClient(ConsumerNetworkClient client, LogContext logContext) {
super(client, logContext);
@ -52,98 +41,15 @@ public class OffsetsForLeaderEpochClient extends AsyncClient< @@ -52,98 +41,15 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
@Override
protected AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
Node node, Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
OffsetForLeaderTopic topic = topics.find(topicPartition.topic());
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic());
topics.add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition())
.setLeaderEpoch(fetchEpoch)
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
})
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
return OffsetsForLeaderEpochUtils.prepareRequest(requestData);
}
@Override
protected OffsetForEpochResult handleResponse(
protected OffsetsForLeaderEpochUtils.OffsetForEpochResult handleResponse(
Node node,
Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
OffsetsForLeaderEpochResponse response) {
Set<TopicPartition> partitionsToRetry = new HashSet<>(requestData.keySet());
Set<String> unauthorizedTopics = new HashSet<>();
Map<TopicPartition, EpochEndOffset> endOffsets = new HashMap<>();
for (OffsetForLeaderTopicResult topic : response.data().topics()) {
for (EpochEndOffset partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.topic(), partition.partition());
if (!requestData.containsKey(topicPartition)) {
logger().warn("Received unrequested topic or partition {} from response, ignoring.", topicPartition);
continue;
}
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
logger().debug("Handling OffsetsForLeaderEpoch response for {}. Got offset {} for epoch {}.",
topicPartition, partition.endOffset(), partition.leaderEpoch());
endOffsets.put(topicPartition, partition);
partitionsToRetry.remove(topicPartition);
break;
case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}.",
topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
partitionsToRetry.remove(topicPartition);
break;
default:
logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.",
topicPartition, error.message());
}
}
}
if (!unauthorizedTopics.isEmpty())
throw new TopicAuthorizationException(unauthorizedTopics);
else
return new OffsetForEpochResult(endOffsets, partitionsToRetry);
}
public static class OffsetForEpochResult {
private final Map<TopicPartition, EpochEndOffset> endOffsets;
private final Set<TopicPartition> partitionsToRetry;
OffsetForEpochResult(Map<TopicPartition, EpochEndOffset> endOffsets, Set<TopicPartition> partitionsNeedingRetry) {
this.endOffsets = endOffsets;
this.partitionsToRetry = partitionsNeedingRetry;
}
public Map<TopicPartition, EpochEndOffset> endOffsets() {
return endOffsets;
}
public Set<TopicPartition> partitionsToRetry() {
return partitionsToRetry;
}
return OffsetsForLeaderEpochUtils.handleResponse(requestData, response);
}
}
}

142
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java

@ -0,0 +1,142 @@ @@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Utility methods for preparing requests to the OffsetsForLeaderEpoch API and handling responses.
*/
public final class OffsetsForLeaderEpochUtils {
private static final Logger LOG = LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
private OffsetsForLeaderEpochUtils(){}
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection(requestData.size());
requestData.forEach((topicPartition, fetchPosition) ->
fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
OffsetForLeaderTopic topic = topics.find(topicPartition.topic());
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic());
topics.add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition())
.setLeaderEpoch(fetchEpoch)
.setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
})
);
return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
}
public static OffsetForEpochResult handleResponse(
Map<TopicPartition, SubscriptionState.FetchPosition> requestData,
OffsetsForLeaderEpochResponse response) {
Set<TopicPartition> partitionsToRetry = new HashSet<>(requestData.keySet());
Set<String> unauthorizedTopics = new HashSet<>();
Map<TopicPartition, EpochEndOffset> endOffsets = new HashMap<>();
for (OffsetForLeaderTopicResult topic : response.data().topics()) {
for (EpochEndOffset partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.topic(), partition.partition());
if (!requestData.containsKey(topicPartition)) {
LOG.warn("Received unrequested topic or partition {} from response, ignoring.", topicPartition);
continue;
}
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
LOG.debug("Handling OffsetsForLeaderEpoch response for {}. Got offset {} for epoch {}.",
topicPartition, partition.endOffset(), partition.leaderEpoch());
endOffsets.put(topicPartition, partition);
partitionsToRetry.remove(topicPartition);
break;
case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
case LEADER_NOT_AVAILABLE:
case FENCED_LEADER_EPOCH:
case UNKNOWN_LEADER_EPOCH:
LOG.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
break;
case UNKNOWN_TOPIC_OR_PARTITION:
LOG.warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}.",
topicPartition);
break;
case TOPIC_AUTHORIZATION_FAILED:
unauthorizedTopics.add(topicPartition.topic());
partitionsToRetry.remove(topicPartition);
break;
default:
LOG.warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.",
topicPartition, error.message());
}
}
}
if (!unauthorizedTopics.isEmpty())
throw new TopicAuthorizationException(unauthorizedTopics);
return new OffsetForEpochResult(endOffsets, partitionsToRetry);
}
static class OffsetForEpochResult {
private final Map<TopicPartition, EpochEndOffset> endOffsets;
private final Set<TopicPartition> partitionsToRetry;
OffsetForEpochResult(Map<TopicPartition, EpochEndOffset> endOffsets, Set<TopicPartition> partitionsNeedingRetry) {
this.endOffsets = endOffsets;
this.partitionsToRetry = partitionsNeedingRetry;
}
public Map<TopicPartition, EpochEndOffset> endOffsets() {
return endOffsets;
}
public Set<TopicPartition> partitionsToRetry() {
return partitionsToRetry;
}
}
}

224
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
@ -28,8 +30,11 @@ import org.apache.kafka.common.IsolationLevel; @@ -28,8 +30,11 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
@ -48,6 +53,8 @@ import java.util.function.BiConsumer; @@ -48,6 +53,8 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion;
import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader;
/**
* Manager responsible for building the following requests to retrieve partition offsets, and
@ -68,15 +75,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -68,15 +75,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
private final IsolationLevel isolationLevel;
private final Logger log;
private final OffsetFetcherUtils offsetFetcherUtils;
private final SubscriptionState subscriptionState;
private final Set<ListOffsetsRequestState> requestsToRetry;
private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
private final long requestTimeoutMs;
private final Time time;
private final ApiVersions apiVersions;
public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final IsolationLevel isolationLevel,
final Time time,
final long retryBackoffMs,
final long requestTimeoutMs,
final ApiVersions apiVersions,
final LogContext logContext) {
requireNonNull(subscriptionState);
@ -91,6 +103,10 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -91,6 +103,10 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
this.log = logContext.logger(getClass());
this.requestsToRetry = new HashSet<>();
this.requestsToSend = new ArrayList<>();
this.subscriptionState = subscriptionState;
this.time = time;
this.requestTimeoutMs = requestTimeoutMs;
this.apiVersions = apiVersions;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
time, retryBackoffMs, apiVersions);
// Register the cluster metadata update callback. Note this only relies on the
@ -154,6 +170,52 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -154,6 +170,52 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets));
}
/**
* Reset offsets for all assigned partitions that require it. Offsets will be reset
* with timestamps according to the reset strategy defined for each partition. This will
* generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent
* on the next call to {@link #poll(long)}.
*
* <p/>
*
* When a response is received, positions are updated in-memory, on the subscription state. If
* an error is received in the response, it will be saved to be thrown on the next call to
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
public void resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
if (offsetResetTimestamps.isEmpty())
return;
List<NetworkClientDelegate.UnsentRequest> unsentRequests =
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
requestsToSend.addAll(unsentRequests);
}
/**
* Validate positions for all assigned partitions for which a leader change has been detected.
* This will generate OffsetsForLeaderEpoch requests for the partitions, with the known offset
* epoch and current leader epoch. It will enqueue the generated requests, to be sent on the
* next call to {@link #poll(long)}.
*
* <p/>
*
* When a response is received, positions are validated and, if a log truncation is
* detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the
* next call to this function.
*/
public void validatePositionsIfNeeded() {
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate =
offsetFetcherUtils.getPartitionsToValidate();
if (partitionsToValidate.isEmpty()) {
return;
}
List<NetworkClientDelegate.UnsentRequest> unsentRequests =
buildOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
requestsToSend.addAll(unsentRequests);
}
/**
* Generate requests for partitions with known leaders. Update the listOffsetsRequestState by adding
* partitions with unknown leader to the listOffsetsRequestState.remainingToSearch
@ -205,14 +267,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -205,14 +267,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
final boolean requireTimestamps,
final ListOffsetsRequestState listOffsetsRequestState) {
log.debug("Building ListOffsets request for partitions {}", timestampsToSearch);
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> partitionResetTimestampsByNode =
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.of(listOffsetsRequestState));
if (partitionResetTimestampsByNode.isEmpty()) {
if (timestampsToSearchByNode.isEmpty()) {
throw new StaleMetadataException();
}
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
MultiNodeRequest multiNodeRequest = new MultiNodeRequest(partitionResetTimestampsByNode.size());
MultiNodeRequest multiNodeRequest = new MultiNodeRequest(timestampsToSearchByNode.size());
multiNodeRequest.onComplete((multiNodeResult, error) -> {
// Done sending request to a set of known leaders
if (error == null) {
@ -235,7 +297,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -235,7 +297,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
}
});
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : partitionResetTimestampsByNode.entrySet()) {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode(
@ -268,7 +330,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -268,7 +330,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
.forConsumer(requireTimestamps, isolationLevel, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
log.debug("Creating ListOffsetRequest {} for broker {} to reset positions", builder,
log.debug("Creating ListOffset request {} for broker {} to reset positions", builder,
node);
NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(
@ -278,7 +340,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -278,7 +340,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
CompletableFuture<ListOffsetResult> result = new CompletableFuture<>();
unsentRequest.future().whenComplete((response, error) -> {
if (error != null) {
log.debug("Sending ListOffsetRequest {} to broker {} failed",
log.debug("Sending ListOffset request {} to broker {} failed",
builder,
node,
error);
@ -298,6 +360,154 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -298,6 +360,154 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
return result;
}
/**
* Make asynchronous ListOffsets request to fetch offsets by target times for the specified
* partitions.
* Use the retrieved offsets to reset positions in the subscription state.
*
* @param timestampsToSearch the mapping between partitions and target time
* @return A list of
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
* that can be polled to obtain the corresponding timestamps and offsets.
*/
private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions(
final Map<TopicPartition, Long> timestampsToSearch) {
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.empty());
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
time.milliseconds() + requestTimeoutMs);
CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode(
node,
resetTimestamps,
false,
unsentRequests);
partialResult.whenComplete((result, error) -> {
if (error == null) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {
e = (RuntimeException) error;
} else {
e = new RuntimeException("Unexpected failure in ListOffsets request for " +
"resetting positions", error);
}
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
}
});
});
return unsentRequests;
}
/**
* For each partition that needs validation, make an asynchronous request to get the end-offsets
* for the partition with the epoch less than or equal to the epoch the partition last saw.
* <p/>
* Requests are grouped by Node for efficiency.
*/
private List<NetworkClientDelegate.UnsentRequest> buildOffsetsForLeaderEpochRequestsAndValidatePositions(
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
regroupFetchPositionsByLeader(partitionsToValidate);
long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
regrouped.forEach((node, fetchPositions) -> {
if (node.isEmpty()) {
metadata.requestUpdate(true);
return;
}
NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
if (nodeApiVersions == null) {
return;
}
if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
"support the required protocol version (introduced in Kafka 2.3)",
fetchPositions.keySet());
for (TopicPartition partition : fetchPositions.keySet()) {
subscriptionState.completeValidation(partition);
}
return;
}
subscriptionState.setNextAllowedRetry(fetchPositions.keySet(), nextResetTimeMs);
CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> partialResult =
buildOffsetsForLeaderEpochRequestToNode(node, fetchPositions, unsentRequests);
partialResult.whenComplete((offsetsResult, error) -> {
if (error == null) {
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
offsetsResult);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {
e = (RuntimeException) error;
} else {
e = new RuntimeException("Unexpected failure in OffsetsForLeaderEpoch " +
"request for validating positions", error);
}
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
}
});
});
return unsentRequests;
}
/**
* Build OffsetsForLeaderEpoch request to send to a specific broker for the partitions and
* positions to fetch. This also adds the request to the list of unsentRequests.
**/
private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> buildOffsetsForLeaderEpochRequestToNode(
final Node node,
final Map<TopicPartition, SubscriptionState.FetchPosition> fetchPositions,
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
AbstractRequest.Builder<OffsetsForLeaderEpochRequest> builder =
OffsetsForLeaderEpochUtils.prepareRequest(fetchPositions);
log.debug("Creating OffsetsForLeaderEpoch request request {} to broker {}", builder, node);
NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(
builder,
Optional.ofNullable(node));
unsentRequests.add(unsentRequest);
CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> result = new CompletableFuture<>();
unsentRequest.future().whenComplete((response, error) -> {
if (error != null) {
log.debug("Sending OffsetsForLeaderEpoch request {} to broker {} failed",
builder,
node,
error);
result.completeExceptionally(error);
} else {
OffsetsForLeaderEpochResponse offsetsForLeaderEpochResponse = (OffsetsForLeaderEpochResponse) response.responseBody();
log.trace("Received OffsetsForLeaderEpoch response {} from broker {}", offsetsForLeaderEpochResponse, node);
try {
OffsetsForLeaderEpochUtils.OffsetForEpochResult listOffsetResult =
OffsetsForLeaderEpochUtils.handleResponse(fetchPositions, offsetsForLeaderEpochResponse);
result.complete(listOffsetResult);
} catch (RuntimeException e) {
result.completeExceptionally(e);
}
}
});
return result;
}
private static class ListOffsetsRequestState {
private final Map<TopicPartition, Long> timestampsToSearch;
@ -383,7 +593,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -383,7 +593,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
if (!leaderAndEpoch.leader.isPresent()) {
log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
metadata.requestUpdate(false);
metadata.requestUpdate(true);
listOffsetsRequestState.ifPresent(offsetsRequestState -> offsetsRequestState.remainingToSearch.put(tp, offset));
} else {
int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);

34
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@ -35,6 +36,8 @@ import org.apache.kafka.clients.consumer.internals.events.EventHandler; @@ -35,6 +36,8 @@ import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@ -193,6 +196,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -193,6 +196,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
// be processed in the collectFetches().
backgroundEvent.ifPresent(event -> processEvent(event, timeout));
}
updateFetchPositionsIfNeeded();
// The idea here is to have the background thread sending fetches autonomously, and the fetcher
// uses the poll loop to retrieve successful fetchResponse and process them on the polling thread.
final Fetch<K, V> fetch = collectFetches();
@ -209,6 +215,34 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -209,6 +215,34 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
return ConsumerRecords.empty();
}
/**
* Set the fetch position to the committed position (if there is one) or reset it using the
* offset reset policy the user has configured (if partitions require reset)
*
* @return true if the operation completed without timing out
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
private boolean updateFetchPositionsIfNeeded() {
// If any partitions have been truncated due to a leader change, we need to validate the offsets
ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent();
eventHandler.add(validatePositionsEvent);
// TODO: integrate logic for refreshing committed offsets if available
// If there are partitions still needing a position and a reset policy is defined,
// request reset using the default policy. If no reset strategy is defined and there
// are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception.
subscriptions.resetInitializingPositions();
// Finally send an asynchronous request to look up and update the positions of any
// partitions which are awaiting reset.
ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent();
eventHandler.add(resetPositionsEvent);
return true;
}
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java

@ -33,6 +33,6 @@ abstract public class ApplicationEvent { @@ -33,6 +33,6 @@ abstract public class ApplicationEvent {
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
}
}

14
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

@ -62,6 +62,10 @@ public class ApplicationEventProcessor { @@ -62,6 +62,10 @@ public class ApplicationEventProcessor {
return process((AssignmentChangeApplicationEvent) event);
case LIST_OFFSETS:
return process((ListOffsetsApplicationEvent) event);
case RESET_POSITIONS:
return processResetPositionsEvent();
case VALIDATE_POSITIONS:
return processValidatePositionsEvent();
}
return false;
}
@ -140,4 +144,14 @@ public class ApplicationEventProcessor { @@ -140,4 +144,14 @@ public class ApplicationEventProcessor {
event.chain(future);
return true;
}
private boolean processResetPositionsEvent() {
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
return true;
}
private boolean processValidatePositionsEvent() {
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
return true;
}
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java

@ -91,7 +91,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent { @@ -91,7 +91,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
@Override
public String toString() {
return "CompletableApplicationEvent{" +
return getClass().getSimpleName() + "{" +
"future=" + future +
", type=" + type +
'}';

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java

@ -85,7 +85,7 @@ public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map @@ -85,7 +85,7 @@ public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map
@Override
public String toString() {
return "ListOffsetsApplicationEvent {" +
return getClass().getSimpleName() + " {" +
"timestampsToSearch=" + timestampsToSearch + ", " +
"requireTimestamps=" + requireTimestamps + '}';
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java

@ -55,7 +55,7 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map @@ -55,7 +55,7 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map
@Override
public String toString() {
return "OffsetFetchApplicationEvent{" +
return getClass().getSimpleName() + "{" +
"partitions=" + partitions +
", future=" + future +
", type=" + type +

30
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
/**
* Event for resetting offsets for all assigned partitions that require it. This is an
* asynchronous event that generates ListOffsets requests, and completes by updating in-memory
* positions when responses are received.
*/
public class ResetPositionsApplicationEvent extends CompletableApplicationEvent<Void> {
public ResetPositionsApplicationEvent() {
super(Type.RESET_POSITIONS);
}
}

30
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
/**
* Event for validating offsets for all assigned partitions for which a leader change has been
* detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and
* completes by validating in-memory positions against the offsets received in the responses.
*/
public class ValidatePositionsApplicationEvent extends CompletableApplicationEvent<Void> {
public ValidatePositionsApplicationEvent() {
super(Type.VALIDATE_POSITIONS);
}
}

87
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
@ -27,7 +28,10 @@ import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent @@ -27,7 +28,10 @@ import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -53,9 +57,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_ @@ -53,9 +57,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -181,6 +187,87 @@ public class DefaultBackgroundThreadTest { @@ -181,6 +187,87 @@ public class DefaultBackgroundThreadTest {
assertTrue(applicationEventsQueue.isEmpty());
backgroundThread.close();
}
@Test
public void testResetPositionsEventIsProcessed() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ResetPositionsApplicationEvent e = new ResetPositionsApplicationEvent();
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
backgroundThread.close();
}
@Test
public void testResetPositionsProcessFailure() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
applicationEventProcessor = spy(new ApplicationEventProcessor(
this.backgroundEventsQueue,
mockRequestManagers(),
metadata));
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
TopicAuthorizationException authException = new TopicAuthorizationException("Topic authorization failed");
doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded();
ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent();
this.applicationEventsQueue.add(event);
assertThrows(TopicAuthorizationException.class, backgroundThread::runOnce);
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
backgroundThread.close();
}
@Test
public void testValidatePositionsEventIsProcessed() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
ValidatePositionsApplicationEvent e = new ValidatePositionsApplicationEvent();
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
backgroundThread.close();
}
@Test
public void testValidatePositionsProcessFailure() {
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult());
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
applicationEventProcessor = spy(new ApplicationEventProcessor(
this.backgroundEventsQueue,
mockRequestManagers(),
metadata));
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
LogTruncationException logTruncationException = new LogTruncationException(Collections.emptyMap(), Collections.emptyMap());
doThrow(logTruncationException).when(offsetsRequestManager).validatePositionsIfNeeded();
ValidatePositionsApplicationEvent event = new ValidatePositionsApplicationEvent();
this.applicationEventsQueue.add(event);
assertThrows(LogTruncationException.class, backgroundThread::runOnce);
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
backgroundThread.close();
}
@Test
public void testAssignmentChangeEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();

18
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java

@ -55,7 +55,7 @@ public class OffsetForLeaderEpochClientTest { @@ -55,7 +55,7 @@ public class OffsetForLeaderEpochClientTest {
@Test
public void testEmptyResponse() {
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), Collections.emptyMap());
OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@ -63,7 +63,7 @@ public class OffsetForLeaderEpochClientTest { @@ -63,7 +63,7 @@ public class OffsetForLeaderEpochClientTest {
client.prepareResponse(resp);
consumerClient.pollNoWakeup();
OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value();
OffsetsForLeaderEpochUtils.OffsetForEpochResult result = future.value();
assertTrue(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().isEmpty());
}
@ -75,7 +75,7 @@ public class OffsetForLeaderEpochClientTest { @@ -75,7 +75,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(
@ -83,7 +83,7 @@ public class OffsetForLeaderEpochClientTest { @@ -83,7 +83,7 @@ public class OffsetForLeaderEpochClientTest {
client.prepareResponse(resp);
consumerClient.pollNoWakeup();
OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value();
OffsetsForLeaderEpochUtils.OffsetForEpochResult result = future.value();
assertFalse(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().isEmpty());
}
@ -95,14 +95,14 @@ public class OffsetForLeaderEpochClientTest { @@ -95,14 +95,14 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
tp0, Errors.NONE, 1, 10L));
consumerClient.pollNoWakeup();
OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value();
OffsetsForLeaderEpochUtils.OffsetForEpochResult result = future.value();
assertTrue(result.partitionsToRetry().isEmpty());
assertTrue(result.endOffsets().containsKey(tp0));
assertEquals(result.endOffsets().get(tp0).errorCode(), Errors.NONE.code());
@ -117,7 +117,7 @@ public class OffsetForLeaderEpochClientTest { @@ -117,7 +117,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@ -136,7 +136,7 @@ public class OffsetForLeaderEpochClientTest { @@ -136,7 +136,7 @@ public class OffsetForLeaderEpochClientTest {
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1))));
OffsetsForLeaderEpochClient offsetClient = newOffsetClient();
RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
RequestFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> future =
offsetClient.sendAsyncRequest(Node.noNode(), positionMap);
client.prepareResponse(prepareOffsetForLeaderEpochResponse(
@ -144,7 +144,7 @@ public class OffsetForLeaderEpochClientTest { @@ -144,7 +144,7 @@ public class OffsetForLeaderEpochClientTest {
consumerClient.pollNoWakeup();
assertFalse(future.failed());
OffsetsForLeaderEpochClient.OffsetForEpochResult result = future.value();
OffsetsForLeaderEpochUtils.OffsetForEpochResult result = future.value();
assertTrue(result.partitionsToRetry().contains(tp0));
assertFalse(result.endOffsets().containsKey(tp0));
}

315
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java

@ -19,7 +19,9 @@ package org.apache.kafka.clients.consumer.internals; @@ -19,7 +19,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.IsolationLevel;
@ -29,11 +31,14 @@ import org.apache.kafka.common.TopicPartition; @@ -29,11 +31,14 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -64,7 +69,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; @@ -64,7 +69,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -75,6 +83,7 @@ public class OffsetsRequestManagerTest { @@ -75,6 +83,7 @@ public class OffsetsRequestManagerTest {
private ConsumerMetadata metadata;
private SubscriptionState subscriptionState;
private MockTime time;
private ApiVersions apiVersions;
private static final String TEST_TOPIC = "t1";
private static final TopicPartition TEST_PARTITION_1 = new TopicPartition(TEST_TOPIC, 1);
private static final TopicPartition TEST_PARTITION_2 = new TopicPartition(TEST_TOPIC, 2);
@ -82,15 +91,16 @@ public class OffsetsRequestManagerTest { @@ -82,15 +91,16 @@ public class OffsetsRequestManagerTest {
private static final Node LEADER_2 = new Node(0, "host2", 9092);
private static final IsolationLevel DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_COMMITTED;
private static final int RETRY_BACKOFF_MS = 500;
private static final int REQUEST_TIMEOUT_MS = 500;
@BeforeEach
public void setup() {
metadata = mock(ConsumerMetadata.class);
subscriptionState = mock(SubscriptionState.class);
this.time = new MockTime(0);
ApiVersions apiVersions = mock(ApiVersions.class);
apiVersions = mock(ApiVersions.class);
requestManager = new OffsetsRequestManager(subscriptionState, metadata,
DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS,
DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS,
apiVersions, new LogContext());
}
@ -99,7 +109,7 @@ public class OffsetsRequestManagerTest { @@ -99,7 +109,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -116,11 +126,12 @@ public class OffsetsRequestManagerTest { @@ -116,11 +126,12 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// Building list offsets request fails with unknown leader
expectFailedRequest_MissingLeader();
mockFailedRequest_MissingLeader();
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(0, requestManager.requestsToSend());
assertEquals(1, requestManager.requestsToRetry());
verify(metadata).requestUpdate(true);
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
// Metadata update not happening within the time boundaries of the request future, so
@ -139,7 +150,7 @@ public class OffsetsRequestManagerTest { @@ -139,7 +150,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
partitionLeaders.put(TEST_PARTITION_2, LEADER_1);
expectSuccessfulRequest(partitionLeaders);
mockSuccessfulRequest(partitionLeaders);
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -176,7 +187,7 @@ public class OffsetsRequestManagerTest { @@ -176,7 +187,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Long> timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -204,19 +215,21 @@ public class OffsetsRequestManagerTest { @@ -204,19 +215,21 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// Building list offsets request fails with unknown leader
expectFailedRequest_MissingLeader();
mockFailedRequest_MissingLeader();
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch,
false);
assertEquals(0, requestManager.requestsToSend());
assertEquals(1, requestManager.requestsToRetry());
verify(metadata).requestUpdate(true);
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
assertFalse(fetchOffsetsFuture.isDone());
// Cluster metadata update. Previously failed attempt to build the request should be retried
// and succeed
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@ -232,7 +245,7 @@ public class OffsetsRequestManagerTest { @@ -232,7 +245,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -255,7 +268,7 @@ public class OffsetsRequestManagerTest { @@ -255,7 +268,7 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
// Cluster metadata update. Failed requests should be retried and succeed
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@ -280,7 +293,7 @@ public class OffsetsRequestManagerTest { @@ -280,7 +293,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -317,7 +330,7 @@ public class OffsetsRequestManagerTest { @@ -317,7 +330,7 @@ public class OffsetsRequestManagerTest {
Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
partitionLeaders.put(TEST_PARTITION_2, LEADER_2);
expectSuccessfulRequest(partitionLeaders);
mockSuccessfulRequest(partitionLeaders);
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets(
timestampsToSearch,
false);
@ -347,7 +360,7 @@ public class OffsetsRequestManagerTest { @@ -347,7 +360,7 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
// Cluster metadata update. Failed requests should be retried
expectSuccessfulRequest(partitionLeaders);
mockSuccessfulRequest(partitionLeaders);
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());
@ -370,7 +383,7 @@ public class OffsetsRequestManagerTest { @@ -370,7 +383,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@ -399,7 +412,7 @@ public class OffsetsRequestManagerTest { @@ -399,7 +412,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@ -431,7 +444,7 @@ public class OffsetsRequestManagerTest { @@ -431,7 +444,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsRequest.EARLIEST_TIMESTAMP);
// List offsets request successfully built
expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture =
requestManager.fetchOffsets(
timestampsToSearch,
@ -455,6 +468,192 @@ public class OffsetsRequestManagerTest { @@ -455,6 +468,192 @@ public class OffsetsRequestManagerTest {
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testResetPositionsSendNoRequestIfNoPartitionsNeedingReset() {
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.emptySet());
requestManager.resetPositionsIfNeeded();
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testResetPositionsMissingLeader() {
mockFailedRequest_MissingLeader();
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
requestManager.resetPositionsIfNeeded();
verify(metadata).requestUpdate(true);
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testResetPositionsSuccess_NoLeaderEpochInResponse() {
testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch.noLeaderOrEpoch());
verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
}
@Test
public void testResetPositionsSuccess_LeaderEpochInResponse() {
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
Optional.of(5));
testResetPositionsSuccessWithLeaderEpoch(leaderAndEpoch);
verify(metadata).updateLastSeenEpochIfNewer(TEST_PARTITION_1, leaderAndEpoch.epoch.get());
}
@Test
public void testResetPositionsThrowsPreviousException() {
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST);
mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1));
requestManager.resetPositionsIfNeeded();
// Reset positions response with TopicAuthorizationException
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
ClientResponse clientResponse = buildClientResponseWithErrors(
unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
clientResponse.onComplete();
assertTrue(unsentRequest.future().isDone());
assertFalse(unsentRequest.future().isCompletedExceptionally());
verify(subscriptionState).requestFailed(any(), anyLong());
verify(metadata).requestUpdate(false);
// Following resetPositions should raise the previous exception without performing any
// request
assertThrows(TopicAuthorizationException.class,
() -> requestManager.resetPositionsIfNeeded());
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testValidatePositionsSuccess() {
int currentOffset = 5;
int expectedEndOffset = 100;
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
Optional.of(3));
TopicPartition tp = TEST_PARTITION_1;
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset,
Optional.of(10), leaderAndEpoch);
mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
requestManager.validatePositionsIfNeeded();
assertEquals(1, requestManager.requestsToSend(), "Invalid request count");
verify(subscriptionState).setNextAllowedRetry(any(), anyLong());
// Validate positions response with end offsets
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(LEADER_1, leaderAndEpoch.epoch));
NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0);
ClientResponse clientResponse = buildOffsetsForLeaderEpochResponse(unsentRequest,
Collections.singletonList(tp), expectedEndOffset);
clientResponse.onComplete();
assertTrue(unsentRequest.future().isDone());
assertFalse(unsentRequest.future().isCompletedExceptionally());
verify(subscriptionState).maybeCompleteValidation(any(), any(), any());
}
@Test
public void testValidatePositionsMissingLeader() {
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(Node.noNode()),
Optional.of(5));
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L,
Optional.of(10), leaderAndEpoch);
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.position(any())).thenReturn(position, position);
NodeApiVersions nodeApiVersions = NodeApiVersions.create();
when(apiVersions.get(LEADER_1.idString())).thenReturn(nodeApiVersions);
requestManager.validatePositionsIfNeeded();
verify(metadata).requestUpdate(true);
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testValidatePositionsFailureWithUnrecoverableAuthException() {
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
Optional.of(5));
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(5L,
Optional.of(10), leaderAndEpoch);
mockSuccessfulBuildRequestForValidatingPositions(position, LEADER_1);
requestManager.validatePositionsIfNeeded();
// Validate positions response with TopicAuthorizationException
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
ClientResponse clientResponse =
buildOffsetsForLeaderEpochResponseWithErrors(unsentRequest, Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED));
clientResponse.onComplete();
assertTrue(unsentRequest.future().isDone());
assertFalse(unsentRequest.future().isCompletedExceptionally());
// Following validatePositions should raise the previous exception without performing any
// request
assertThrows(TopicAuthorizationException.class, () -> requestManager.validatePositionsIfNeeded());
assertEquals(0, requestManager.requestsToSend());
}
@Test
public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers() {
int currentOffset = 5;
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.of(LEADER_1),
Optional.of(3));
SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(currentOffset,
Optional.of(10), leaderAndEpoch);
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.position(any())).thenReturn(position, position);
// No api version info initially available
when(apiVersions.get(LEADER_1.idString())).thenReturn(null);
requestManager.validatePositionsIfNeeded();
assertEquals(0, requestManager.requestsToSend(), "Invalid request count");
verify(subscriptionState, never()).completeValidation(TEST_PARTITION_1);
verify(subscriptionState, never()).setNextAllowedRetry(any(), anyLong());
// Api version updated, next validate positions should successfully build the request
when(apiVersions.get(LEADER_1.idString())).thenReturn(NodeApiVersions.create());
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.position(any())).thenReturn(position, position);
requestManager.validatePositionsIfNeeded();
assertEquals(1, requestManager.requestsToSend(), "Invalid request count");
}
private void mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition position, Node leader) {
when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1));
when(subscriptionState.position(any())).thenReturn(position, position);
NodeApiVersions nodeApiVersions = NodeApiVersions.create();
when(apiVersions.get(leader.idString())).thenReturn(nodeApiVersions);
}
private void testResetPositionsSuccessWithLeaderEpoch(Metadata.LeaderAndEpoch leaderAndEpoch) {
TopicPartition tp = TEST_PARTITION_1;
Node leader = LEADER_1;
OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
long offset = 5L;
Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(tp,
new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
mockSuccessfulRequest(Collections.singletonMap(tp, leader));
requestManager.resetPositionsIfNeeded();
assertEquals(1, requestManager.requestsToSend());
// Reset positions response with offsets
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, leaderAndEpoch.epoch));
NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds());
NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0);
ClientResponse clientResponse = buildClientResponse(unsentRequest, expectedOffsets);
clientResponse.onComplete();
assertTrue(unsentRequest.future().isDone());
assertFalse(unsentRequest.future().isCompletedExceptionally());
}
private ListOffsetsResponseData.ListOffsetsTopicResponse mockUnknownOffsetResponse(
TopicPartition tp) {
return new ListOffsetsResponseData.ListOffsetsTopicResponse()
@ -488,21 +687,21 @@ public class OffsetsRequestManagerTest { @@ -488,21 +687,21 @@ public class OffsetsRequestManagerTest {
NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds());
verifySuccessfulPollAwaitingResponse(retriedPoll);
NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0);
ClientResponse clientResponse = buildClientResponse(unsentRequest,
expectedResult);
ClientResponse clientResponse = buildClientResponse(unsentRequest, expectedResult);
clientResponse.onComplete();
verifyRequestSuccessfullyCompleted(actualResult, expectedResult);
}
private void expectSuccessfulRequest(Map<TopicPartition, Node> partitionLeaders) {
private void mockSuccessfulRequest(Map<TopicPartition, Node> partitionLeaders) {
partitionLeaders.forEach((tp, broker) -> {
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker));
when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker,
Metadata.LeaderAndEpoch.noLeaderOrEpoch().epoch));
when(subscriptionState.isAssigned(tp)).thenReturn(true);
});
when(metadata.fetch()).thenReturn(testClusterMetadata(partitionLeaders));
}
private void expectFailedRequest_MissingLeader() {
private void mockFailedRequest_MissingLeader() {
when(metadata.currentLeader(any(TopicPartition.class))).thenReturn(
new Metadata.LeaderAndEpoch(Optional.empty(), Optional.of(1)));
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
@ -563,9 +762,8 @@ public class OffsetsRequestManagerTest { @@ -563,9 +762,8 @@ public class OffsetsRequestManagerTest {
assertEquals(expectedFailure, failure.getCause().getClass());
}
private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader) {
return new Metadata.LeaderAndEpoch(Optional.of(leader),
Optional.of(1));
private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader, Optional<Integer> epoch) {
return new Metadata.LeaderAndEpoch(Optional.of(leader), epoch);
}
private Cluster testClusterMetadata(Map<TopicPartition, Node> partitionLeaders) {
@ -597,6 +795,75 @@ public class OffsetsRequestManagerTest { @@ -597,6 +795,75 @@ public class OffsetsRequestManagerTest {
return buildClientResponse(request, topicResponses, false, null);
}
private ClientResponse buildOffsetsForLeaderEpochResponse(
final NetworkClientDelegate.UnsentRequest request,
final List<TopicPartition> partitions,
final int endOffset) {
AbstractRequest abstractRequest = request.requestBuilder().build();
assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = (OffsetsForLeaderEpochRequest) abstractRequest;
OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
partitions.forEach(tp -> {
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
if (topic == null) {
topic = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
data.topics().add(topic);
}
topic.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset()
.setPartition(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(3)
.setEndOffset(endOffset));
});
OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1),
request.callback(),
"-1",
time.milliseconds(),
time.milliseconds(),
false,
null,
null,
response
);
}
private ClientResponse buildOffsetsForLeaderEpochResponseWithErrors(
final NetworkClientDelegate.UnsentRequest request,
final Map<TopicPartition, Errors> partitionErrors) {
AbstractRequest abstractRequest = request.requestBuilder().build();
assertTrue(abstractRequest instanceof OffsetsForLeaderEpochRequest);
OffsetsForLeaderEpochRequest offsetsForLeaderEpochRequest = (OffsetsForLeaderEpochRequest) abstractRequest;
OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
partitionErrors.keySet().forEach(tp -> {
OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
if (topic == null) {
topic = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
data.topics().add(topic);
}
topic.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset()
.setPartition(tp.partition())
.setErrorCode(partitionErrors.get(tp).code()));
});
OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1),
request.callback(),
"-1",
time.milliseconds(),
time.milliseconds(),
false,
null,
null,
response
);
}
private ClientResponse buildClientResponse(
final NetworkClientDelegate.UnsentRequest request,
final List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicResponses) {

Loading…
Cancel
Save