Browse Source

KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)

AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down.

Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, vamossagar12 <sagarmeansocean@gmail.com>
pull/14525/head
Gantigmaa Selenge 1 year ago committed by GitHub
parent
commit
3c9031c624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  2. 12
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  3. 4
      core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

5
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient { @@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient {
*/
void call(Call call, long now) {
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
call.handleTimeoutFailure(time.milliseconds(),
new TimeoutException("The AdminClient thread is not accepting new calls."));
log.debug("Cannot accept new call {} when AdminClient is closing.", call);
call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing."));
} else if (metadataManager.usingBootstrapControllers() &&
(!call.nodeProvider.supportsUseControllers())) {
call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " +

12
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -478,6 +478,18 @@ public class KafkaAdminClientTest { @@ -478,6 +478,18 @@ public class KafkaAdminClientTest {
callbackCalled.acquire();
}
@Test
public void testAdminClientFailureWhenClosed() {
MockTime time = new MockTime();
AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0));
env.adminClient().close();
ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics(
singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(10000)).all().get());
assertTrue(e.getCause() instanceof IllegalStateException,
"Expected an IllegalStateException error, but got " + Utils.stackTrace(e));
}
private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors error) {
return new OffsetDeleteResponse(
new OffsetDeleteResponseData()

4
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
/**
* Test closing the AdminClient with a generous timeout. Calls in progress should be completed,
* since they can be done within the timeout. New calls should receive timeouts.
* since they can be done within the timeout. New calls should receive exceptions.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
client.close(time.Duration.ofHours(2))
val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
future.get
client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect
}

Loading…
Cancel
Save