Browse Source
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly. Co-authored-by: Lianet Magrans <lmagrans@confluent.io> Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>pull/14435/head
Philip Nee
1 year ago
committed by
GitHub
15 changed files with 636 additions and 93 deletions
@ -0,0 +1,233 @@
@@ -0,0 +1,233 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.ClientResponse; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.common.Cluster; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.PartitionInfo; |
||||
import org.apache.kafka.common.errors.InvalidTopicException; |
||||
import org.apache.kafka.common.errors.RetriableException; |
||||
import org.apache.kafka.common.errors.TopicAuthorizationException; |
||||
import org.apache.kafka.common.protocol.Errors; |
||||
import org.apache.kafka.common.requests.MetadataRequest; |
||||
import org.apache.kafka.common.requests.MetadataResponse; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.Set; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* <p> |
||||
* Manages the state of topic metadata requests. This manager returns a |
||||
* {@link NetworkClientDelegate.PollResult} when a request is ready to |
||||
* be sent. Specifically, this manager handles the following user API calls: |
||||
* </p> |
||||
* <ul> |
||||
* <li>listTopics</li> |
||||
* <li>partitionsFor</li> |
||||
* </ul> |
||||
* <p> |
||||
* The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to |
||||
* prevent sending it without backing off from previous attempts. |
||||
* It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. |
||||
* The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional |
||||
* .empty()} as the key. |
||||
* Once a request is completed successfully, its corresponding entry is removed. |
||||
* </p> |
||||
*/ |
||||
|
||||
public class TopicMetadataRequestManager implements RequestManager { |
||||
private final boolean allowAutoTopicCreation; |
||||
private final Map<Optional<String>, TopicMetadataRequestState> inflightRequests; |
||||
private final long retryBackoffMs; |
||||
private final long retryBackoffMaxMs; |
||||
private final Logger log; |
||||
private final LogContext logContext; |
||||
|
||||
public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { |
||||
logContext = context; |
||||
log = logContext.logger(getClass()); |
||||
inflightRequests = new HashMap<>(); |
||||
retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); |
||||
retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); |
||||
allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); |
||||
} |
||||
|
||||
@Override |
||||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { |
||||
List<NetworkClientDelegate.UnsentRequest> requests = inflightRequests.values().stream() |
||||
.map(req -> req.send(currentTimeMs)) |
||||
.filter(Optional::isPresent) |
||||
.map(Optional::get) |
||||
.collect(Collectors.toList()); |
||||
return requests.isEmpty() ? |
||||
new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : |
||||
new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); |
||||
} |
||||
|
||||
/** |
||||
* return the future of the metadata request. Return the existing future if a request for the same topic is already |
||||
* inflight. |
||||
* |
||||
* @param topic to be requested. If empty, return the metadata for all topics. |
||||
* @return the future of the metadata request. |
||||
*/ |
||||
public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(final Optional<String> topic) { |
||||
if (inflightRequests.containsKey(topic)) { |
||||
return inflightRequests.get(topic).future; |
||||
} |
||||
|
||||
TopicMetadataRequestState newRequest = new TopicMetadataRequestState( |
||||
logContext, |
||||
topic, |
||||
retryBackoffMs, |
||||
retryBackoffMaxMs); |
||||
inflightRequests.put(topic, newRequest); |
||||
return newRequest.future; |
||||
} |
||||
|
||||
// Visible for testing
|
||||
List<TopicMetadataRequestState> inflightRequests() { |
||||
return new ArrayList<>(inflightRequests.values()); |
||||
} |
||||
|
||||
class TopicMetadataRequestState extends RequestState { |
||||
private final Optional<String> topic; |
||||
CompletableFuture<Map<String, List<PartitionInfo>>> future; |
||||
|
||||
public TopicMetadataRequestState(final LogContext logContext, |
||||
final Optional<String> topic, |
||||
final long retryBackoffMs, |
||||
final long retryBackoffMaxMs) { |
||||
super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, |
||||
retryBackoffMaxMs); |
||||
future = new CompletableFuture<>(); |
||||
this.topic = topic; |
||||
} |
||||
|
||||
/** |
||||
* prepare the metadata request and return an |
||||
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. |
||||
*/ |
||||
private Optional<NetworkClientDelegate.UnsentRequest> send(final long currentTimeMs) { |
||||
if (!canSendRequest(currentTimeMs)) { |
||||
return Optional.empty(); |
||||
} |
||||
onSendAttempt(currentTimeMs); |
||||
|
||||
final MetadataRequest.Builder request = |
||||
topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) |
||||
.orElseGet(MetadataRequest.Builder::allTopics); |
||||
|
||||
return Optional.of(createUnsentRequest(request)); |
||||
} |
||||
|
||||
private NetworkClientDelegate.UnsentRequest createUnsentRequest( |
||||
final MetadataRequest.Builder request) { |
||||
return new NetworkClientDelegate.UnsentRequest( |
||||
request, |
||||
Optional.empty(), |
||||
this::processResponseOrException |
||||
); |
||||
} |
||||
|
||||
private void processResponseOrException(final ClientResponse response, |
||||
final Throwable exception) { |
||||
if (exception == null) { |
||||
handleResponse(response, response.receivedTimeMs()); |
||||
return; |
||||
} |
||||
|
||||
if (exception instanceof RetriableException) { |
||||
// We continue to retry on RetriableException
|
||||
// TODO: TimeoutException will continue to retry despite user API timeout.
|
||||
onFailedAttempt(response.receivedTimeMs()); |
||||
} else { |
||||
completeFutureAndRemoveRequest(new KafkaException(exception)); |
||||
} |
||||
} |
||||
|
||||
private void handleResponse(final ClientResponse response, final long responseTimeMs) { |
||||
try { |
||||
Map<String, List<PartitionInfo>> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); |
||||
future.complete(res); |
||||
inflightRequests.remove(topic); |
||||
} catch (RetriableException e) { |
||||
onFailedAttempt(responseTimeMs); |
||||
} catch (Exception t) { |
||||
completeFutureAndRemoveRequest(t); |
||||
} |
||||
} |
||||
|
||||
private void completeFutureAndRemoveRequest(final Throwable throwable) { |
||||
future.completeExceptionally(throwable); |
||||
inflightRequests.remove(topic); |
||||
} |
||||
|
||||
private Map<String, List<PartitionInfo>> handleTopicMetadataResponse(final MetadataResponse response) { |
||||
Cluster cluster = response.buildCluster(); |
||||
|
||||
final Set<String> unauthorizedTopics = cluster.unauthorizedTopics(); |
||||
if (!unauthorizedTopics.isEmpty()) |
||||
throw new TopicAuthorizationException(unauthorizedTopics); |
||||
|
||||
Map<String, Errors> errors = response.errors(); |
||||
if (!errors.isEmpty()) { |
||||
// if there were errors, we need to check whether they were fatal or whether
|
||||
// we should just retry
|
||||
|
||||
log.debug("Topic metadata fetch included errors: {}", errors); |
||||
|
||||
for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) { |
||||
String topic = errorEntry.getKey(); |
||||
Errors error = errorEntry.getValue(); |
||||
|
||||
if (error == Errors.INVALID_TOPIC_EXCEPTION) |
||||
throw new InvalidTopicException("Topic '" + topic + "' is invalid"); |
||||
else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) |
||||
// if a requested topic is unknown, we just continue and let it be absent
|
||||
// in the returned map
|
||||
continue; |
||||
else if (error.exception() instanceof RetriableException) { |
||||
throw error.exception(); |
||||
} else |
||||
throw new KafkaException("Unexpected error fetching metadata for topic " + topic, |
||||
error.exception()); |
||||
} |
||||
} |
||||
|
||||
HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>(); |
||||
for (String topic : cluster.topics()) |
||||
topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic)); |
||||
return topicsPartitionInfos; |
||||
} |
||||
|
||||
public Optional<String> topic() { |
||||
return topic; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,57 @@
@@ -0,0 +1,57 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.common.PartitionInfo; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
public class TopicMetadataApplicationEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> { |
||||
private final String topic; |
||||
public TopicMetadataApplicationEvent(final String topic) { |
||||
super(Type.TOPIC_METADATA); |
||||
this.topic = topic; |
||||
} |
||||
|
||||
public String topic() { |
||||
return topic; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "TopicMetadataApplicationEvent(topic=" + topic + ")"; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (!(o instanceof TopicMetadataApplicationEvent)) return false; |
||||
if (!super.equals(o)) return false; |
||||
|
||||
TopicMetadataApplicationEvent that = (TopicMetadataApplicationEvent) o; |
||||
|
||||
return topic.equals(that.topic); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
int result = super.hashCode(); |
||||
result = 31 * result + topic.hashCode(); |
||||
return result; |
||||
} |
||||
} |
@ -0,0 +1,221 @@
@@ -0,0 +1,221 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.ClientResponse; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.common.Cluster; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.Node; |
||||
import org.apache.kafka.common.PartitionInfo; |
||||
import org.apache.kafka.common.errors.NetworkException; |
||||
import org.apache.kafka.common.errors.RetriableException; |
||||
import org.apache.kafka.common.errors.TimeoutException; |
||||
import org.apache.kafka.common.protocol.ApiKeys; |
||||
import org.apache.kafka.common.protocol.Errors; |
||||
import org.apache.kafka.common.requests.AbstractRequest; |
||||
import org.apache.kafka.common.requests.MetadataRequest; |
||||
import org.apache.kafka.common.requests.MetadataResponse; |
||||
import org.apache.kafka.common.requests.RequestHeader; |
||||
import org.apache.kafka.common.requests.RequestTestUtils; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.MockTime; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.params.ParameterizedTest; |
||||
import org.junit.jupiter.params.provider.Arguments; |
||||
import org.junit.jupiter.params.provider.MethodSource; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertFalse; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
import static org.junit.jupiter.api.Assertions.fail; |
||||
import static org.mockito.Mockito.spy; |
||||
|
||||
public class TopicMetadataRequestManagerTest { |
||||
private MockTime time; |
||||
private TopicMetadataRequestManager topicMetadataRequestManager; |
||||
|
||||
@BeforeEach |
||||
public void setup() { |
||||
this.time = new MockTime(); |
||||
Properties props = new Properties(); |
||||
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 100); |
||||
props.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); |
||||
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager( |
||||
new LogContext(), |
||||
new ConsumerConfig(props))); |
||||
} |
||||
|
||||
@ParameterizedTest |
||||
@MethodSource("topicsProvider") |
||||
public void testPoll_SuccessfulRequestTopicMetadata(Optional<String> topic) { |
||||
this.topicMetadataRequestManager.requestTopicMetadata(topic); |
||||
this.time.sleep(100); |
||||
NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); |
||||
assertEquals(1, res.unsentRequests.size()); |
||||
} |
||||
|
||||
@ParameterizedTest |
||||
@MethodSource("exceptionProvider") |
||||
public void testExceptionAndInflightRequests(final Errors error, final boolean shouldRetry) { |
||||
String topic = "hello"; |
||||
this.topicMetadataRequestManager.requestTopicMetadata(Optional.of("hello")); |
||||
this.time.sleep(100); |
||||
NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); |
||||
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( |
||||
res.unsentRequests.get(0), |
||||
Optional.of(topic), |
||||
error)); |
||||
List<TopicMetadataRequestManager.TopicMetadataRequestState> inflights = this.topicMetadataRequestManager.inflightRequests(); |
||||
|
||||
if (shouldRetry) { |
||||
assertEquals(1, inflights.size()); |
||||
assertEquals(topic, inflights.get(0).topic().orElse(null)); |
||||
} else { |
||||
assertEquals(0, inflights.size()); |
||||
} |
||||
} |
||||
|
||||
@ParameterizedTest |
||||
@MethodSource("topicsProvider") |
||||
public void testSendingTheSameRequest(Optional<String> topic) { |
||||
CompletableFuture<Map<String, List<PartitionInfo>>> future = this.topicMetadataRequestManager.requestTopicMetadata(topic); |
||||
CompletableFuture<Map<String, List<PartitionInfo>>> future2 = |
||||
this.topicMetadataRequestManager.requestTopicMetadata(topic); |
||||
this.time.sleep(100); |
||||
NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); |
||||
assertEquals(1, res.unsentRequests.size()); |
||||
|
||||
res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( |
||||
res.unsentRequests.get(0), |
||||
topic, |
||||
Errors.NONE)); |
||||
|
||||
assertTrue(future.isDone()); |
||||
assertFalse(future.isCompletedExceptionally()); |
||||
try { |
||||
future.get(); |
||||
} catch (Throwable e) { |
||||
fail("Expecting to succeed, but got: {}", e); |
||||
} |
||||
assertTrue(future2.isDone()); |
||||
assertFalse(future2.isCompletedExceptionally()); |
||||
} |
||||
|
||||
@ParameterizedTest |
||||
@MethodSource("hardFailureExceptionProvider") |
||||
void testHardFailures(Exception exception) { |
||||
Optional<String> topic = Optional.of("hello"); |
||||
|
||||
this.topicMetadataRequestManager.requestTopicMetadata(topic); |
||||
NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); |
||||
assertEquals(1, res.unsentRequests.size()); |
||||
|
||||
res.unsentRequests.get(0).future().completeExceptionally(exception); |
||||
|
||||
if (exception instanceof RetriableException) { |
||||
assertFalse(topicMetadataRequestManager.inflightRequests().isEmpty()); |
||||
} else { |
||||
assertTrue(topicMetadataRequestManager.inflightRequests().isEmpty()); |
||||
} |
||||
} |
||||
|
||||
private ClientResponse buildTopicMetadataClientResponse( |
||||
final NetworkClientDelegate.UnsentRequest request, |
||||
final Optional<String> topic, |
||||
final Errors error) { |
||||
AbstractRequest abstractRequest = request.requestBuilder().build(); |
||||
assertTrue(abstractRequest instanceof MetadataRequest); |
||||
MetadataRequest metadataRequest = (MetadataRequest) abstractRequest; |
||||
Cluster cluster = mockCluster(3, 0); |
||||
List<MetadataResponse.TopicMetadata> topics = new ArrayList<>(); |
||||
if (topic.isPresent()) { |
||||
topics.add(new MetadataResponse.TopicMetadata(error, topic.get(), false, |
||||
Collections.emptyList())); |
||||
} else { |
||||
// null topic means request for all topics
|
||||
topics.add(new MetadataResponse.TopicMetadata(error, "topic1", false, |
||||
Collections.emptyList())); |
||||
topics.add(new MetadataResponse.TopicMetadata(error, "topic2", false, |
||||
Collections.emptyList())); |
||||
} |
||||
final MetadataResponse metadataResponse = RequestTestUtils.metadataResponse(cluster.nodes(), |
||||
cluster.clusterResource().clusterId(), |
||||
cluster.controller().id(), |
||||
topics); |
||||
return new ClientResponse( |
||||
new RequestHeader(ApiKeys.METADATA, metadataRequest.version(), "mockClientId", 1), |
||||
request.callback(), |
||||
"-1", |
||||
time.milliseconds(), |
||||
time.milliseconds(), |
||||
false, |
||||
null, |
||||
null, |
||||
metadataResponse); |
||||
} |
||||
|
||||
private static Cluster mockCluster(final int numNodes, final int controllerIndex) { |
||||
HashMap<Integer, Node> nodes = new HashMap<>(); |
||||
for (int i = 0; i < numNodes; i++) |
||||
nodes.put(i, new Node(i, "localhost", 8121 + i)); |
||||
return new Cluster("mockClusterId", nodes.values(), |
||||
Collections.emptySet(), Collections.emptySet(), |
||||
Collections.emptySet(), nodes.get(controllerIndex)); |
||||
} |
||||
|
||||
|
||||
private static Collection<Arguments> topicsProvider() { |
||||
return Arrays.asList( |
||||
Arguments.of(Optional.of("topic1")), |
||||
Arguments.of(Optional.empty())); |
||||
} |
||||
|
||||
private static Collection<Arguments> exceptionProvider() { |
||||
return Arrays.asList( |
||||
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), |
||||
Arguments.of(Errors.INVALID_TOPIC_EXCEPTION, false), |
||||
Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), |
||||
Arguments.of(Errors.NETWORK_EXCEPTION, true), |
||||
Arguments.of(Errors.NONE, false)); |
||||
} |
||||
|
||||
private static Collection<Arguments> hardFailureExceptionProvider() { |
||||
return Arrays.asList( |
||||
Arguments.of(new TimeoutException("timeout")), |
||||
Arguments.of(new KafkaException("non-retriable exception")), |
||||
Arguments.of(new NetworkException("retriable-exception"))); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue