Browse Source

KAFKA-15534: Inject request completion time when the request failed (#14532)

Currently, we aren't able to access the request completion time if the request is completed exceptionally, which results in many system calls. This is not ideal because these system calls can add up. Instead, time is already retrieved on the top of the background thread event loop, which is then propagated into the NetworkClientDelegate.poll.

In this PR - I store the completion time in the handler, so that it becomes accessible in the callbacks.

Reviewer: Bruno Cadonna <cadonna@apache.org>
pull/14504/merge
Philip Nee 11 months ago committed by GitHub
parent
commit
c81a725219
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
  2. 10
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
  3. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
  4. 38
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
  5. 36
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
  6. 5
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
  7. 22
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
  8. 9
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
  9. 25
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
  10. 6
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
  11. 35
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java

61
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

@ -195,41 +195,41 @@ public class CommitRequestManager implements RequestManager { @@ -195,41 +195,41 @@ public class CommitRequestManager implements RequestManager {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
.getOrDefault(topicPartition.topic(),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topicPartition.topic())
);
.getOrDefault(topicPartition.topic(),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topicPartition.topic())
);
topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(topicPartition.partition())
.setCommittedOffset(offsetAndMetadata.offset())
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setCommittedMetadata(offsetAndMetadata.metadata())
.setPartitionIndex(topicPartition.partition())
.setCommittedOffset(offsetAndMetadata.offset())
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setCommittedMetadata(offsetAndMetadata.metadata())
);
requestTopicDataMap.put(topicPartition.topic(), topic);
}
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator(),
(response, throwable) -> {
if (throwable == null) {
future.complete(null);
} else {
future.completeExceptionally(throwable);
}
});
builder,
coordinatorRequestManager.coordinator(),
(response, throwable) -> {
if (throwable == null) {
future.complete(null);
} else {
future.completeExceptionally(throwable);
}
});
}
}
private class OffsetFetchRequestState extends RequestState {
class OffsetFetchRequestState extends RequestState {
public final Set<TopicPartition> requestedPartitions;
public final GroupState.Generation requestedGeneration;
private final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;
@ -254,14 +254,14 @@ public class CommitRequestManager implements RequestManager { @@ -254,14 +254,14 @@ public class CommitRequestManager implements RequestManager {
new ArrayList<>(this.requestedPartitions),
throwOnFetchStableOffsetUnsupported);
return new NetworkClientDelegate.UnsentRequest(
builder,
coordinatorRequestManager.coordinator(),
(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
builder,
coordinatorRequestManager.coordinator(),
(r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody()));
}
public void onResponse(
final long currentTimeMs,
final OffsetFetchResponse response) {
final long currentTimeMs,
final OffsetFetchResponse response) {
Errors responseError = response.groupLevelError(groupState.groupId);
if (responseError != Errors.NONE) {
onFailure(currentTimeMs, responseError);
@ -279,7 +279,7 @@ public class CommitRequestManager implements RequestManager { @@ -279,7 +279,7 @@ public class CommitRequestManager implements RequestManager {
retry(currentTimeMs);
} else if (responseError == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds());
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), currentTimeMs);
retry(currentTimeMs);
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
@ -290,7 +290,6 @@ public class CommitRequestManager implements RequestManager { @@ -290,7 +290,6 @@ public class CommitRequestManager implements RequestManager {
private void retry(final long currentTimeMs) {
onFailedAttempt(currentTimeMs);
onSendAttempt(currentTimeMs);
pendingRequests.addOffsetFetchRequest(this);
}

10
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java

@ -115,12 +115,11 @@ public class CoordinatorRequestManager implements RequestManager { @@ -115,12 +115,11 @@ public class CoordinatorRequestManager implements RequestManager {
);
unsentRequest.future().whenComplete((clientResponse, throwable) -> {
long responseTimeMs = time.milliseconds();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(responseTimeMs, response);
onResponse(clientResponse.receivedTimeMs(), response);
} else {
onFailedResponse(responseTimeMs, throwable);
onFailedResponse(unsentRequest.handler().completionTimeMs(), throwable);
}
});
@ -165,10 +164,7 @@ public class CoordinatorRequestManager implements RequestManager { @@ -165,10 +164,7 @@ public class CoordinatorRequestManager implements RequestManager {
coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
}
private void onFailedResponse(
final long currentTimeMs,
final Throwable exception
) {
private void onFailedResponse(final long currentTimeMs, final Throwable exception) {
coordinatorRequestState.onFailedAttempt(currentTimeMs);
markCoordinatorUnknown("FindCoordinator failed with exception", currentTimeMs);

6
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java

@ -186,11 +186,9 @@ public class HeartbeatRequestManager implements RequestManager { @@ -186,11 +186,9 @@ public class HeartbeatRequestManager implements RequestManager {
coordinatorRequestManager.coordinator());
request.future().whenComplete((response, exception) -> {
if (response != null) {
onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs());
onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
} else {
// TODO: Currently, we lack a good way to propage the response time from the network client to the
// request handler. We will need to store the response time in the handler to make it accessible.
onFailure(exception, time.milliseconds());
onFailure(exception, request.handler().completionTimeMs());
}
});
return request;

38
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java

@ -91,7 +91,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -91,7 +91,7 @@ public class NetworkClientDelegate implements AutoCloseable {
pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
}
this.client.poll(pollTimeoutMs, currentTimeMs);
checkDisconnects();
checkDisconnects(currentTimeMs);
}
/**
@ -106,7 +106,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -106,7 +106,7 @@ public class NetworkClientDelegate implements AutoCloseable {
unsent.timer.update(currentTimeMs);
if (unsent.timer.isExpired()) {
iterator.remove();
unsent.handler.onFailure(new TimeoutException(
unsent.handler.onFailure(currentTimeMs, new TimeoutException(
"Failed to send request after " + unsent.timer.timeoutMs() + " ms."));
continue;
}
@ -137,7 +137,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -137,7 +137,7 @@ public class NetworkClientDelegate implements AutoCloseable {
return true;
}
private void checkDisconnects() {
private void checkDisconnects(final long currentTimeMs) {
// Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
Iterator<UnsentRequest> iter = unsentRequests.iterator();
while (iter.hasNext()) {
@ -145,7 +145,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -145,7 +145,7 @@ public class NetworkClientDelegate implements AutoCloseable {
if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
iter.remove();
AuthenticationException authenticationException = client.authenticationException(u.node.get());
u.handler.onFailure(authenticationException);
u.handler.onFailure(currentTimeMs, authenticationException);
}
}
}
@ -224,7 +224,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -224,7 +224,7 @@ public class NetworkClientDelegate implements AutoCloseable {
final Optional<Node> node,
final BiConsumer<ClientResponse, Throwable> callback) {
this(requestBuilder, node);
this.handler.future.whenComplete(callback);
this.handler.future().whenComplete(callback);
}
public void setTimer(final Time time, final long requestTimeoutMs) {
@ -235,7 +235,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -235,7 +235,7 @@ public class NetworkClientDelegate implements AutoCloseable {
return handler.future;
}
RequestCompletionHandler callback() {
FutureCompletionHandler handler() {
return handler;
}
@ -256,27 +256,39 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -256,27 +256,39 @@ public class NetworkClientDelegate implements AutoCloseable {
public static class FutureCompletionHandler implements RequestCompletionHandler {
private long responseCompletionTimeMs;
private final CompletableFuture<ClientResponse> future;
FutureCompletionHandler() {
this.future = new CompletableFuture<>();
future = new CompletableFuture<>();
}
public void onFailure(final RuntimeException e) {
future.completeExceptionally(e);
public void onFailure(final long currentTimeMs, final RuntimeException e) {
this.responseCompletionTimeMs = currentTimeMs;
this.future.completeExceptionally(e);
}
public long completionTimeMs() {
return responseCompletionTimeMs;
}
@Override
public void onComplete(final ClientResponse response) {
long completionTimeMs = response.receivedTimeMs();
if (response.authenticationException() != null) {
onFailure(response.authenticationException());
onFailure(completionTimeMs, response.authenticationException());
} else if (response.wasDisconnected()) {
onFailure(DisconnectException.INSTANCE);
onFailure(completionTimeMs, DisconnectException.INSTANCE);
} else if (response.versionMismatch() != null) {
onFailure(response.versionMismatch());
onFailure(completionTimeMs, response.versionMismatch());
} else {
future.complete(response);
responseCompletionTimeMs = completionTimeMs;
this.future.complete(response);
}
}
public CompletableFuture<ClientResponse> future() {
return future;
}
}
}

36
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java

@ -148,30 +148,32 @@ public class TopicMetadataRequestManager implements RequestManager { @@ -148,30 +148,32 @@ public class TopicMetadataRequestManager implements RequestManager {
private NetworkClientDelegate.UnsentRequest createUnsentRequest(
final MetadataRequest.Builder request) {
return new NetworkClientDelegate.UnsentRequest(
request,
Optional.empty(),
this::processResponseOrException
);
}
NetworkClientDelegate.UnsentRequest unsent = new NetworkClientDelegate.UnsentRequest(
request,
Optional.empty());
unsent.future().whenComplete((response, exception) -> {
if (response == null) {
handleError(exception, unsent.handler().completionTimeMs());
} else {
handleResponse(response);
}
});
private void processResponseOrException(final ClientResponse response,
final Throwable exception) {
if (exception == null) {
handleResponse(response, response.receivedTimeMs());
return;
}
return unsent;
}
private void handleError(final Throwable exception,
final long completionTimeMs) {
if (exception instanceof RetriableException) {
// We continue to retry on RetriableException
// TODO: TimeoutException will continue to retry despite user API timeout.
onFailedAttempt(response.receivedTimeMs());
onFailedAttempt(completionTimeMs);
} else {
completeFutureAndRemoveRequest(new KafkaException(exception));
completeFutureAndRemoveRequest(exception);
}
}
private void handleResponse(final ClientResponse response, final long responseTimeMs) {
private void handleResponse(final ClientResponse response) {
long responseTimeMs = response.receivedTimeMs();
try {
Map<String, List<PartitionInfo>> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody());
future.complete(res);

5
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java

@ -328,7 +328,8 @@ public class CommitRequestManagerTest { @@ -328,7 +328,8 @@ public class CommitRequestManagerTest {
NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(res.unsentRequests.get(0), partitions, error));
res.unsentRequests.get(0).future().complete(buildOffsetFetchClientResponse(res.unsentRequests.get(0),
partitions, error));
res = commitRequestManger.poll(time.milliseconds());
assertEquals(0, res.unsentRequests.size());
return futures;
@ -391,7 +392,7 @@ public class CommitRequestManagerTest { @@ -391,7 +392,7 @@ public class CommitRequestManagerTest {
new OffsetFetchResponse(error, topicPartitionData);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1),
request.callback(),
request.handler(),
"-1",
time.milliseconds(),
time.milliseconds(),

22
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
@ -145,6 +146,25 @@ public class CoordinatorRequestManagerTest { @@ -145,6 +146,25 @@ public class CoordinatorRequestManagerTest {
assertEquals(this.node.id(), respNew.coordinatorByKey(GROUP_ID).get().nodeId());
}
@Test
public void testNetworkTimeout() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
NetworkClientDelegate.PollResult res = coordinatorManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
// Mimic a network timeout
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException());
// Sleep for exponential backoff - 1ms
time.sleep(RETRY_BACKOFF_MS - 1);
NetworkClientDelegate.PollResult res2 = coordinatorManager.poll(this.time.milliseconds());
assertEquals(0, res2.unsentRequests.size());
time.sleep(1);
res2 = coordinatorManager.poll(time.milliseconds());
assertEquals(1, res2.unsentRequests.size());
}
private void expectFindCoordinatorRequest(
CoordinatorRequestManager coordinatorManager,
Errors error
@ -182,7 +202,7 @@ public class CoordinatorRequestManagerTest { @@ -182,7 +202,7 @@ public class CoordinatorRequestManagerTest {
FindCoordinatorResponse.prepareResponse(error, GROUP_ID, node);
return new ClientResponse(
new RequestHeader(ApiKeys.FIND_COORDINATOR, findCoordinatorRequest.version(), "", 1),
request.callback(),
request.handler(),
node.idString(),
time.milliseconds(),
time.milliseconds(),

9
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java

@ -171,7 +171,7 @@ public class HeartbeatRequestManagerTest { @@ -171,7 +171,7 @@ public class HeartbeatRequestManagerTest {
}
@Test
public void testBackoffOnTimeout() {
public void testNetworkTimeout() {
heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(
logContext,
time,
@ -184,7 +184,8 @@ public class HeartbeatRequestManagerTest { @@ -184,7 +184,8 @@ public class HeartbeatRequestManagerTest {
when(membershipManager.shouldSendHeartbeat()).thenReturn(true);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
result.unsentRequests.get(0).future().completeExceptionally(new TimeoutException("timeout"));
// Mimic network timeout
result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout"));
// Assure the manager will backoff on timeout
time.sleep(RETRY_BACKOFF_MS - 1);
@ -298,7 +299,7 @@ public class HeartbeatRequestManagerTest { @@ -298,7 +299,7 @@ public class HeartbeatRequestManagerTest {
ClientResponse response = createHeartbeatResponse(
result.unsentRequests.get(0),
error);
result.unsentRequests.get(0).future().complete(response);
result.unsentRequests.get(0).handler().onComplete(response);
ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody();
switch (error) {
@ -385,7 +386,7 @@ public class HeartbeatRequestManagerTest { @@ -385,7 +386,7 @@ public class HeartbeatRequestManagerTest {
ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(data);
return new ClientResponse(
new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.callback(),
request.handler(),
"0",
time.milliseconds(),
time.milliseconds(),

25
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
@ -41,8 +42,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; @@ -41,8 +42,11 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class NetworkClientDelegateTest {
private static final int REQUEST_TIMEOUT_MS = 5000;
@ -97,6 +101,27 @@ public class NetworkClientDelegateTest { @@ -97,6 +101,27 @@ public class NetworkClientDelegateTest {
}
}
@Test
public void testEnsureCorrectCompletionTimeOnFailure() {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
long timeMs = time.milliseconds();
unsentRequest.handler().onFailure(timeMs, new TimeoutException());
time.sleep(100);
assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
}
@Test
public void testEnsureCorrectCompletionTimeOnComplete() {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
long timeMs = time.milliseconds();
final ClientResponse response = mock(ClientResponse.class);
when(response.receivedTimeMs()).thenReturn(timeMs);
unsentRequest.handler().onComplete(response);
time.sleep(100);
assertEquals(timeMs, unsentRequest.handler().completionTimeMs());
}
public NetworkClientDelegate newNetworkClientDelegate() {
LogContext logContext = new LogContext();
Properties properties = new Properties();

6
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java

@ -820,7 +820,7 @@ public class OffsetsRequestManagerTest { @@ -820,7 +820,7 @@ public class OffsetsRequestManagerTest {
OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1),
request.callback(),
request.handler(),
"-1",
time.milliseconds(),
time.milliseconds(),
@ -853,7 +853,7 @@ public class OffsetsRequestManagerTest { @@ -853,7 +853,7 @@ public class OffsetsRequestManagerTest {
OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FOR_LEADER_EPOCH, offsetsForLeaderEpochRequest.version(), "", 1),
request.callback(),
request.handler(),
"-1",
time.milliseconds(),
time.milliseconds(),
@ -902,7 +902,7 @@ public class OffsetsRequestManagerTest { @@ -902,7 +902,7 @@ public class OffsetsRequestManagerTest {
ListOffsetsResponse response = buildListOffsetsResponse(topicResponses);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1),
request.callback(),
request.handler(),
"-1",
time.milliseconds(),
time.milliseconds(),

35
clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java

@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ -135,7 +136,7 @@ public class TopicMetadataRequestManagerTest { @@ -135,7 +136,7 @@ public class TopicMetadataRequestManagerTest {
@ParameterizedTest
@MethodSource("hardFailureExceptionProvider")
void testHardFailures(Exception exception) {
public void testHardFailures(Exception exception) {
Optional<String> topic = Optional.of("hello");
this.topicMetadataRequestManager.requestTopicMetadata(topic);
@ -151,6 +152,36 @@ public class TopicMetadataRequestManagerTest { @@ -151,6 +152,36 @@ public class TopicMetadataRequestManagerTest {
}
}
@Test
public void testNetworkTimeout() {
Optional<String> topic = Optional.of("hello");
topicMetadataRequestManager.requestTopicMetadata(topic);
NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds());
assertEquals(1, res.unsentRequests.size());
NetworkClientDelegate.PollResult res2 = this.topicMetadataRequestManager.poll(this.time.milliseconds());
assertEquals(0, res2.unsentRequests.size());
// Mimic a network timeout
res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException());
long backoffMs = topicMetadataRequestManager.inflightRequests().get(0).remainingBackoffMs(time.milliseconds());
// Sleep for exponential backoff - 1ms
time.sleep(backoffMs - 1);
res2 = topicMetadataRequestManager.poll(this.time.milliseconds());
assertEquals(0, res2.unsentRequests.size());
time.sleep(1);
res2 = topicMetadataRequestManager.poll(this.time.milliseconds());
assertEquals(1, res2.unsentRequests.size());
res2.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse(
res2.unsentRequests.get(0),
topic,
Errors.NONE));
assertTrue(topicMetadataRequestManager.inflightRequests().isEmpty());
}
private ClientResponse buildTopicMetadataClientResponse(
final NetworkClientDelegate.UnsentRequest request,
final Optional<String> topic,
@ -176,7 +207,7 @@ public class TopicMetadataRequestManagerTest { @@ -176,7 +207,7 @@ public class TopicMetadataRequestManagerTest {
topics);
return new ClientResponse(
new RequestHeader(ApiKeys.METADATA, metadataRequest.version(), "mockClientId", 1),
request.callback(),
request.handler(),
"-1",
time.milliseconds(),
time.milliseconds(),

Loading…
Cancel
Save