diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index af47e55d75a..d18c0ed9e89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -141,6 +141,10 @@ public class CommonClientConfigs {
+ "The value must be set lower than session.timeout.ms
, but typically should be set no higher "
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+ public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
+ public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
+ "This configuration is used as the default timeout for all client operations that do not specify a timeout
parameter.";
+
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
index ccccf118a1a..2312fe4b81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
@@ -26,7 +26,7 @@ public abstract class AbstractOptions {
protected Integer timeoutMs = null;
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
@SuppressWarnings("unchecked")
@@ -36,7 +36,7 @@ public abstract class AbstractOptions {
}
/**
- * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * The timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
public Integer timeoutMs() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 107eb56d63a..ad62f1fbcd6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
@@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig {
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
+ public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
/**
* security.providers
@@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig {
RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
- 120000,
+ 30000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
@@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig {
CONNECTIONS_MAX_IDLE_MS_DOC)
.define(RETRIES_CONFIG,
Type.INT,
- 5,
- atLeast(0),
+ Integer.MAX_VALUE,
+ between(0, Integer.MAX_VALUE),
Importance.LOW,
CommonClientConfigs.RETRIES_DOC)
+ .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
+ Type.INT,
+ 60000,
+ atLeast(0),
+ Importance.MEDIUM,
+ CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 0b280532104..fc933c4b1db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -32,7 +32,7 @@ public class AlterConfigsOptions extends AbstractOptions {
private boolean validateOnly = false;
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
index bfb8e32db15..ad4ae74ff26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
@@ -30,7 +30,7 @@ import java.util.Collection;
public class CreateAclsOptions extends AbstractOptions {
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index a9f1009c2fc..cd03bc62e24 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -32,7 +32,7 @@ public class CreateTopicsOptions extends AbstractOptions {
private boolean validateOnly = false;
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
index 1b67da52f38..7c250e10b3e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
@@ -30,7 +30,7 @@ import java.util.Collection;
public class DeleteAclsOptions extends AbstractOptions {
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
index 91e38a196fc..def02867b67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -30,7 +30,7 @@ import java.util.Collection;
public class DeleteTopicsOptions extends AbstractOptions {
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
index b17d6a7d0cb..e44d58473df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
public class DescribeAclsOptions extends AbstractOptions {
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
index 670feda0d26..2eac1f055f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -30,7 +30,7 @@ public class DescribeClusterOptions extends AbstractOptions requestBuilder;
try {
- requestBuilder = call.createRequest(timeoutMs);
+ requestBuilder = call.createRequest(requestTimeoutMs);
} catch (Throwable throwable) {
call.fail(now, new KafkaException(String.format(
"Internal error sending %s to %s.", call.callName, node)));
continue;
}
- ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
+ ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
+ true, requestTimeoutMs, null);
log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
client.send(clientRequest, now);
getOrCreateListValue(callsInFlight, node.idString()).add(call);
@@ -1300,7 +1330,7 @@ public class KafkaAdminClient extends AdminClient {
* Create a new metadata call.
*/
private Call makeMetadataCall(long now) {
- return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs),
+ return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
public MetadataRequest.Builder createRequest(int timeoutMs) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index e288e1828fd..5a8f2b4ad15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -30,7 +30,7 @@ public class ListTopicsOptions extends AbstractOptions {
private boolean listInternal = false;
/**
- * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
+ * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 67897cfd54f..ce7a595e586 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
/** default.api.timeout.ms
*/
- public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
- public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a timeout
parameter.";
+ public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
/** interceptor.classes
*/
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
@@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig {
60 * 1000,
atLeast(0),
Importance.MEDIUM,
- DEFAULT_API_TIMEOUT_MS_DOC)
+ CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cca563b8359..f82fddde248 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -108,11 +109,11 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
-import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
-import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -123,9 +124,7 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -144,6 +143,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -178,6 +178,14 @@ public class KafkaAdminClientTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
+ @Test
+ public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
+ final AdminClientConfig config = newConfMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500");
+ KafkaException exception = assertThrows(KafkaException.class,
+ () -> KafkaAdminClient.createInternal(config, null));
+ assertTrue(exception.getCause() instanceof ConfigException);
+ }
+
@Test
public void testGetOrCreateListValue() {
Map> map = new HashMap<>();
@@ -591,7 +599,7 @@ public class KafkaAdminClientTest {
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
- AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
+ AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) {
// The first request fails with a disconnect
@@ -848,54 +856,6 @@ public class KafkaAdminClientTest {
}
}
- /**
- * Test handling timeouts.
- */
- @Ignore // The test is flaky. Should be renabled when this JIRA is fixed: https://issues.apache.org/jira/browse/KAFKA-5792
- @Test
- public void testHandleTimeout() throws Exception {
- MockTime time = new MockTime();
- try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
- mockCluster(1, 0),
- AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
- AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
- env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- assertEquals(time, env.time());
- assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
-
- // Make a request with an extremely short timeout.
- // Then wait for it to fail by not supplying any response.
- log.info("Starting AdminClient#listTopics...");
- final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000));
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return env.kafkaClient().hasInFlightRequests();
- }
- }, "Timed out waiting for inFlightRequests");
- time.sleep(5000);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return result.listings().isDone();
- }
- }, "Timed out waiting for listTopics to complete");
- TestUtils.assertFutureError(result.listings(), TimeoutException.class);
- log.info("Verified the error result of AdminClient#listTopics");
-
- // The next request should succeed.
- time.sleep(5000);
- env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
- Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
- new DescribeConfigsResponse.Config(ApiError.NONE,
- Collections.emptySet()))));
- DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
- new ConfigResource(ConfigResource.Type.TOPIC, "foo")));
- time.sleep(5000);
- result2.values().get(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get();
- }
- }
-
@Test
public void testDescribeConfigs() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -2554,6 +2514,146 @@ public class KafkaAdminClientTest {
errorsMap, memberIdentities.get(1), "For unit test").getClass());
}
+ @Test
+ public void testSuccessfulRetryAfterRequestTimeout() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockTime time = new MockTime();
+ Node node0 = new Node(0, "localhost", 8121);
+ nodes.put(0, node0);
+ Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+ Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
+ Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), nodes.get(0));
+
+ final int requestTimeoutMs = 1000;
+ final int retryBackoffMs = 100;
+ final int apiTimeoutMs = 3000;
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+ AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ final ListTopicsResult result = env.adminClient()
+ .listTopics(new ListTopicsOptions().timeoutMs(apiTimeoutMs));
+
+ // Wait until the first attempt has been sent, then advance the time
+ TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for Metadata request to be sent");
+ time.sleep(requestTimeoutMs + 1);
+
+ // Wait for the request to be timed out before backing off
+ TestUtils.waitForCondition(() -> !env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for inFlightRequests to be timed out");
+ time.sleep(retryBackoffMs);
+
+ // Since api timeout bound is not hit, AdminClient should retry
+ TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
+ "Failed to retry Metadata request");
+ env.kafkaClient().respond(prepareMetadataResponse(cluster, Errors.NONE));
+
+ assertEquals(1, result.listings().get().size());
+ assertEquals("foo", result.listings().get().iterator().next().name());
+ }
+ }
+
+ @Test
+ public void testDefaultApiTimeout() throws Exception {
+ testApiTimeout(1500, 3000, OptionalInt.empty());
+ }
+
+ @Test
+ public void testDefaultApiTimeoutOverride() throws Exception {
+ testApiTimeout(1500, 10000, OptionalInt.of(3000));
+ }
+
+ private void testApiTimeout(int requestTimeoutMs,
+ int defaultApiTimeoutMs,
+ OptionalInt overrideApiTimeoutMs) throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockTime time = new MockTime();
+ Node node0 = new Node(0, "localhost", 8121);
+ nodes.put(0, node0);
+ Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+ Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
+ Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), nodes.get(0));
+
+ final int retryBackoffMs = 100;
+ final int effectiveTimeoutMs = overrideApiTimeoutMs.orElse(defaultApiTimeoutMs);
+ assertEquals("This test expects the effective timeout to be twice the request timeout",
+ 2 * requestTimeoutMs, effectiveTimeoutMs);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+ AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs),
+ AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(defaultApiTimeoutMs))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ ListTopicsOptions options = new ListTopicsOptions();
+ overrideApiTimeoutMs.ifPresent(options::timeoutMs);
+
+ final ListTopicsResult result = env.adminClient().listTopics(options);
+
+ // Wait until the first attempt has been sent, then advance the time
+ TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for Metadata request to be sent");
+ time.sleep(requestTimeoutMs + 1);
+
+ // Wait for the request to be timed out before backing off
+ TestUtils.waitForCondition(() -> !env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for inFlightRequests to be timed out");
+ time.sleep(retryBackoffMs);
+
+ // Since api timeout bound is not hit, AdminClient should retry
+ TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for Metadata request to be sent");
+ time.sleep(requestTimeoutMs + 1);
+
+ TestUtils.assertFutureThrows(result.future, TimeoutException.class);
+ }
+ }
+
+ @Test
+ public void testRequestTimeoutExceedingDefaultApiTimeout() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockTime time = new MockTime();
+ Node node0 = new Node(0, "localhost", 8121);
+ nodes.put(0, node0);
+ Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+ Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
+ Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(), nodes.get(0));
+
+ // This test assumes the default api timeout value of 60000. When the request timeout
+ // is set to something larger, we should adjust the api timeout accordingly for compatibility.
+
+ final int retryBackoffMs = 100;
+ final int requestTimeoutMs = 120000;
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+ AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ ListTopicsOptions options = new ListTopicsOptions();
+
+ final ListTopicsResult result = env.adminClient().listTopics(options);
+
+ // Wait until the first attempt has been sent, then advance the time by the default api timeout
+ TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
+ "Timed out waiting for Metadata request to be sent");
+ time.sleep(60001);
+
+ // The in-flight request should not be cancelled
+ assertTrue(env.kafkaClient().hasInFlightRequests());
+
+ // Now sleep the remaining time for the request timeout to expire
+ time.sleep(60000);
+ TestUtils.assertFutureThrows(result.future, TimeoutException.class);
+ }
+ }
+
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 3e30a474f28..e71702ab582 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -79,7 +79,8 @@ object LeaderElectionCommand extends Logging {
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.options.valueOf(commandOptions.bootstrapServer)
)
- props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
+ props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
+ props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)
Admin.create(props)
}
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 2c0eea050be..5ddf047b352 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -74,6 +74,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
new Properties()
adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt))
adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString)
+ adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (timeout * 2).toString)
new AdminClientCommand(adminProps)
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index ca3c5a8c107..c33df2dafb6 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -986,7 +986,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@Test
def testCallInFlightTimeouts(): Unit = {
val config = createConfig()
- config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000")
+ config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 5f987b805cc..142dbca5942 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -238,7 +238,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
private def createAdminClient: Admin = {
val config = createConfig()
- config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000")
+ config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000")
val client = Admin.create(config)
adminClients += client
client
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index e328f2afb20..1b8069ac26d 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -309,7 +309,8 @@ object LeaderElectionCommandTest {
def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers),
- AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000"
+ AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000",
+ AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000"
)
}
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 1108d999bd7..1cbf33e0f0b 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -67,6 +67,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "15000")
Admin.create(props)
}