diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 4db6b271946..27d28b5b336 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient { */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { - log.debug("The AdminClient is not accepting new calls. Timing out {}.", call); - call.handleTimeoutFailure(time.milliseconds(), - new TimeoutException("The AdminClient thread is not accepting new calls.")); + log.debug("Cannot accept new call {} when AdminClient is closing.", call); + call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing.")); } else if (metadataManager.usingBootstrapControllers() && (!call.nodeProvider.supportsUseControllers())) { call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " + diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 229d3119871..378e08b2c45 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -478,6 +478,18 @@ public class KafkaAdminClientTest { callbackCalled.acquire(); } + @Test + public void testAdminClientFailureWhenClosed() { + MockTime time = new MockTime(); + AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0)); + env.adminClient().close(); + ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics( + singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), + new CreateTopicsOptions().timeoutMs(10000)).all().get()); + assertTrue(e.getCause() instanceof IllegalStateException, + "Expected an IllegalStateException error, but got " + Utils.stackTrace(e)); + } + private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors error) { return new OffsetDeleteResponse( new OffsetDeleteResponseData() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 784374d23e8..5bb3533146c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, - * since they can be done within the timeout. New calls should receive timeouts. + * since they can be done within the timeout. New calls should receive exceptions. */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) @@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() client.close(time.Duration.ofHours(2)) val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() - assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) + assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException]) future.get client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect }