Browse Source

KAFKA-8503; Add default api timeout to AdminClient (KIP-533) (#8011)

This PR implements `default.api.timeout.ms` as documented by KIP-533. This is a rebased version of #6913 with some additional test cases and small cleanups.

Reviewers: David Arthur <mumrah@gmail.com>

Co-authored-by: huxi <huxi_2b@hotmail.com>
pull/8029/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
4317325fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
  3. 14
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
  4. 2
      clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
  5. 2
      clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java
  6. 2
      clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
  7. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java
  8. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
  9. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java
  10. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
  11. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
  12. 2
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
  13. 48
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  14. 2
      clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
  15. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  16. 206
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  17. 3
      core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
  18. 1
      core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
  19. 2
      core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
  20. 2
      core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
  21. 3
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
  22. 1
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

4
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

@ -141,6 +141,10 @@ public class CommonClientConfigs { @@ -141,6 +141,10 @@ public class CommonClientConfigs {
+ "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.";
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.

4
clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java

@ -26,7 +26,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> { @@ -26,7 +26,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> {
protected Integer timeoutMs = null;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
@SuppressWarnings("unchecked")
@ -36,7 +36,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> { @@ -36,7 +36,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> {
}
/**
* The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* The timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
public Integer timeoutMs() {

14
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

@ -30,6 +30,7 @@ import java.util.Map; @@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig { @@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig {
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
/**
* <code>security.providers</code>
@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig { @@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig {
RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
120000,
30000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig { @@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig {
CONNECTIONS_MAX_IDLE_MS_DOC)
.define(RETRIES_CONFIG,
Type.INT,
5,
atLeast(0),
Integer.MAX_VALUE,
between(0, Integer.MAX_VALUE),
Importance.LOW,
CommonClientConfigs.RETRIES_DOC)
.define(DEFAULT_API_TIMEOUT_MS_CONFIG,
Type.INT,
60000,
atLeast(0),
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,

2
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java

@ -32,7 +32,7 @@ public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> { @@ -32,7 +32,7 @@ public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> {
private boolean validateOnly = false;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java

@ -30,7 +30,7 @@ import java.util.Collection; @@ -30,7 +30,7 @@ import java.util.Collection;
public class CreateAclsOptions extends AbstractOptions<CreateAclsOptions> {
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java

@ -32,7 +32,7 @@ public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> { @@ -32,7 +32,7 @@ public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {
private boolean validateOnly = false;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java

@ -30,7 +30,7 @@ import java.util.Collection; @@ -30,7 +30,7 @@ import java.util.Collection;
public class DeleteAclsOptions extends AbstractOptions<DeleteAclsOptions> {
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java

@ -30,7 +30,7 @@ import java.util.Collection; @@ -30,7 +30,7 @@ import java.util.Collection;
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java

@ -29,7 +29,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; @@ -29,7 +29,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
public class DescribeAclsOptions extends AbstractOptions<DescribeAclsOptions> {
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java

@ -30,7 +30,7 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio @@ -30,7 +30,7 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
private boolean includeAuthorizedOperations;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java

@ -32,7 +32,7 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio @@ -32,7 +32,7 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
private boolean includeSynonyms = false;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

2
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java

@ -32,7 +32,7 @@ public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions @@ -32,7 +32,7 @@ public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions
private boolean includeAuthorizedOperations;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

48
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -53,6 +53,7 @@ import org.apache.kafka.common.acl.AclBinding; @@ -53,6 +53,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
@ -263,7 +264,12 @@ public class KafkaAdminClient extends AdminClient { @@ -263,7 +264,12 @@ public class KafkaAdminClient extends AdminClient {
/**
* The default timeout to use for an operation.
*/
private final int defaultTimeoutMs;
private final int defaultApiTimeoutMs;
/**
* The timeout to use for a single request.
*/
private final int requestTimeoutMs;
/**
* The name of this AdminClient instance.
@ -391,7 +397,7 @@ public class KafkaAdminClient extends AdminClient { @@ -391,7 +397,7 @@ public class KafkaAdminClient extends AdminClient {
private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
if (optionTimeoutMs != null)
return now + Math.max(0, optionTimeoutMs);
return now + defaultTimeoutMs;
return now + defaultApiTimeoutMs;
}
/**
@ -500,9 +506,10 @@ public class KafkaAdminClient extends AdminClient { @@ -500,9 +506,10 @@ public class KafkaAdminClient extends AdminClient {
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = configureDefaultApiTimeoutMs(config);
this.time = time;
this.metadataManager = metadataManager;
this.metrics = metrics;
@ -520,8 +527,29 @@ public class KafkaAdminClient extends AdminClient { @@ -520,8 +527,29 @@ public class KafkaAdminClient extends AdminClient {
thread.start();
}
Time time() {
return time;
/**
* If a default.api.timeout.ms has been explicitly specified, raise an error if it conflicts with request.timeout.ms.
* If no default.api.timeout.ms has been configured, then set its value as the max of the default and request.timeout.ms. Also we should probably log a warning.
* Otherwise, use the provided values for both configurations.
*
* @param config The configuration
*/
private int configureDefaultApiTimeoutMs(AdminClientConfig config) {
int requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
int defaultApiTimeoutMs = config.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
if (defaultApiTimeoutMs < requestTimeoutMs) {
if (config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
throw new ConfigException("The specified value of " + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG +
" must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
} else {
log.warn("Overriding the default value for {} ({}) with the explicitly configured request timeout {}",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs,
requestTimeoutMs);
return requestTimeoutMs;
}
}
return defaultApiTimeoutMs;
}
@Override
@ -993,16 +1021,18 @@ public class KafkaAdminClient extends AdminClient { @@ -993,16 +1021,18 @@ public class KafkaAdminClient extends AdminClient {
continue;
}
Call call = calls.remove(0);
int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs,
calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
AbstractRequest.Builder<?> requestBuilder;
try {
requestBuilder = call.createRequest(timeoutMs);
requestBuilder = call.createRequest(requestTimeoutMs);
} catch (Throwable throwable) {
call.fail(now, new KafkaException(String.format(
"Internal error sending %s to %s.", call.callName, node)));
continue;
}
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
true, requestTimeoutMs, null);
log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
client.send(clientRequest, now);
getOrCreateListValue(callsInFlight, node.idString()).add(call);
@ -1300,7 +1330,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1300,7 +1330,7 @@ public class KafkaAdminClient extends AdminClient {
* Create a new metadata call.
*/
private Call makeMetadataCall(long now) {
return new Call(true, "fetchMetadata", calcDeadlineMs(now, defaultTimeoutMs),
return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
public MetadataRequest.Builder createRequest(int timeoutMs) {

2
clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java

@ -30,7 +30,7 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> { @@ -30,7 +30,7 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
private boolean listInternal = false;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/

5
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig { @@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
/** <code>default.api.timeout.ms</code> */
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter.";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig { @@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig {
60 * 1000,
atLeast(0),
Importance.MEDIUM,
DEFAULT_API_TIMEOUT_MS_DOC)
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,

206
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclBinding; @@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@ -108,11 +109,11 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; @@ -108,11 +109,11 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@ -123,9 +124,7 @@ import org.apache.kafka.common.resource.ResourceType; @@ -123,9 +124,7 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -144,6 +143,7 @@ import java.util.LinkedList; @@ -144,6 +143,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -178,6 +178,14 @@ public class KafkaAdminClientTest { @@ -178,6 +178,14 @@ public class KafkaAdminClientTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
final AdminClientConfig config = newConfMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500");
KafkaException exception = assertThrows(KafkaException.class,
() -> KafkaAdminClient.createInternal(config, null));
assertTrue(exception.getCause() instanceof ConfigException);
}
@Test
public void testGetOrCreateListValue() {
Map<String, List<String>> map = new HashMap<>();
@ -591,7 +599,7 @@ public class KafkaAdminClientTest { @@ -591,7 +599,7 @@ public class KafkaAdminClientTest {
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) {
// The first request fails with a disconnect
@ -848,54 +856,6 @@ public class KafkaAdminClientTest { @@ -848,54 +856,6 @@ public class KafkaAdminClientTest {
}
}
/**
* Test handling timeouts.
*/
@Ignore // The test is flaky. Should be renabled when this JIRA is fixed: https://issues.apache.org/jira/browse/KAFKA-5792
@Test
public void testHandleTimeout() throws Exception {
MockTime time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
mockCluster(1, 0),
AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
assertEquals(time, env.time());
assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
// Make a request with an extremely short timeout.
// Then wait for it to fail by not supplying any response.
log.info("Starting AdminClient#listTopics...");
final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000));
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return env.kafkaClient().hasInFlightRequests();
}
}, "Timed out waiting for inFlightRequests");
time.sleep(5000);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return result.listings().isDone();
}
}, "Timed out waiting for listTopics to complete");
TestUtils.assertFutureError(result.listings(), TimeoutException.class);
log.info("Verified the error result of AdminClient#listTopics");
// The next request should succeed.
time.sleep(5000);
env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
new DescribeConfigsResponse.Config(ApiError.NONE,
Collections.emptySet()))));
DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
new ConfigResource(ConfigResource.Type.TOPIC, "foo")));
time.sleep(5000);
result2.values().get(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get();
}
}
@Test
public void testDescribeConfigs() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@ -2554,6 +2514,146 @@ public class KafkaAdminClientTest { @@ -2554,6 +2514,146 @@ public class KafkaAdminClientTest {
errorsMap, memberIdentities.get(1), "For unit test").getClass());
}
@Test
public void testSuccessfulRetryAfterRequestTimeout() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
Node node0 = new Node(0, "localhost", 8121);
nodes.put(0, node0);
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
final int requestTimeoutMs = 1000;
final int retryBackoffMs = 100;
final int apiTimeoutMs = 3000;
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
final ListTopicsResult result = env.adminClient()
.listTopics(new ListTopicsOptions().timeoutMs(apiTimeoutMs));
// Wait until the first attempt has been sent, then advance the time
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for Metadata request to be sent");
time.sleep(requestTimeoutMs + 1);
// Wait for the request to be timed out before backing off
TestUtils.waitForCondition(() -> !env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for inFlightRequests to be timed out");
time.sleep(retryBackoffMs);
// Since api timeout bound is not hit, AdminClient should retry
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
"Failed to retry Metadata request");
env.kafkaClient().respond(prepareMetadataResponse(cluster, Errors.NONE));
assertEquals(1, result.listings().get().size());
assertEquals("foo", result.listings().get().iterator().next().name());
}
}
@Test
public void testDefaultApiTimeout() throws Exception {
testApiTimeout(1500, 3000, OptionalInt.empty());
}
@Test
public void testDefaultApiTimeoutOverride() throws Exception {
testApiTimeout(1500, 10000, OptionalInt.of(3000));
}
private void testApiTimeout(int requestTimeoutMs,
int defaultApiTimeoutMs,
OptionalInt overrideApiTimeoutMs) throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
Node node0 = new Node(0, "localhost", 8121);
nodes.put(0, node0);
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
final int retryBackoffMs = 100;
final int effectiveTimeoutMs = overrideApiTimeoutMs.orElse(defaultApiTimeoutMs);
assertEquals("This test expects the effective timeout to be twice the request timeout",
2 * requestTimeoutMs, effectiveTimeoutMs);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs),
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(defaultApiTimeoutMs))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
ListTopicsOptions options = new ListTopicsOptions();
overrideApiTimeoutMs.ifPresent(options::timeoutMs);
final ListTopicsResult result = env.adminClient().listTopics(options);
// Wait until the first attempt has been sent, then advance the time
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for Metadata request to be sent");
time.sleep(requestTimeoutMs + 1);
// Wait for the request to be timed out before backing off
TestUtils.waitForCondition(() -> !env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for inFlightRequests to be timed out");
time.sleep(retryBackoffMs);
// Since api timeout bound is not hit, AdminClient should retry
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for Metadata request to be sent");
time.sleep(requestTimeoutMs + 1);
TestUtils.assertFutureThrows(result.future, TimeoutException.class);
}
}
@Test
public void testRequestTimeoutExceedingDefaultApiTimeout() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
Node node0 = new Node(0, "localhost", 8121);
nodes.put(0, node0);
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
// This test assumes the default api timeout value of 60000. When the request timeout
// is set to something larger, we should adjust the api timeout accordingly for compatibility.
final int retryBackoffMs = 100;
final int requestTimeoutMs = 120000;
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
ListTopicsOptions options = new ListTopicsOptions();
final ListTopicsResult result = env.adminClient().listTopics(options);
// Wait until the first attempt has been sent, then advance the time by the default api timeout
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for Metadata request to be sent");
time.sleep(60001);
// The in-flight request should not be cancelled
assertTrue(env.kafkaClient().hasInFlightRequests());
// Now sleep the remaining time for the request timeout to expire
time.sleep(60000);
TestUtils.assertFutureThrows(result.future, TimeoutException.class);
}
}
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),

3
core/src/main/scala/kafka/admin/LeaderElectionCommand.scala

@ -79,7 +79,8 @@ object LeaderElectionCommand extends Logging { @@ -79,7 +79,8 @@ object LeaderElectionCommand extends Logging {
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.options.valueOf(commandOptions.bootstrapServer)
)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)
Admin.create(props)
}

1
core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala

@ -74,6 +74,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { @@ -74,6 +74,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
new Properties()
adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt))
adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString)
adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (timeout * 2).toString)
new AdminClientCommand(adminProps)
}

2
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

@ -986,7 +986,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @@ -986,7 +986,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@Test
def testCallInFlightTimeouts(): Unit = {
val config = createConfig()
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000")
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,

2
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala

@ -238,7 +238,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { @@ -238,7 +238,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
private def createAdminClient: Admin = {
val config = createConfig()
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000")
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000")
val client = Admin.create(config)
adminClients += client
client

3
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala

@ -309,7 +309,8 @@ object LeaderElectionCommandTest { @@ -309,7 +309,8 @@ object LeaderElectionCommandTest {
def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000"
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000"
)
}

1
core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

@ -67,6 +67,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -67,6 +67,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "15000")
Admin.create(props)
}

Loading…
Cancel
Save