diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java new file mode 100644 index 00000000000..88b2148d364 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +public class AdminClientTestUtils { + + /** + * Helper to create a ListPartitionReassignmentsResult instance for a given Throwable. + * ListPartitionReassignmentsResult's constructor is only accessible from within the + * admin package. + */ + public static ListPartitionReassignmentsResult listPartitionReassignmentsResult(Throwable t) { + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + future.completeExceptionally(t); + return new ListPartitionReassignmentsResult(future); + } +} diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8c1629bc0d4..16075d0ff90 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -28,11 +28,11 @@ import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListPartitionReassignmentsOptions, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig} +import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig} import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo} import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.config.{ConfigResource, TopicConfig} -import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException, UnsupportedVersionException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, TopicExistsException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.{Time, Utils} @@ -284,13 +284,12 @@ object TopicCommand extends Logging { private def listAllReassignments(topicPartitions: util.Set[TopicPartition]): Map[TopicPartition, PartitionReassignment] = { try { - adminClient.listPartitionReassignments(topicPartitions, new ListPartitionReassignmentsOptions) - .reassignments().get().asScala + adminClient.listPartitionReassignments(topicPartitions).reassignments().get().asScala } catch { case e: ExecutionException => e.getCause match { - case ex: UnsupportedVersionException => - logger.debug("Couldn't query reassignments through the AdminClient API", ex) + case ex @ (_: UnsupportedVersionException | _: ClusterAuthorizationException) => + logger.debug(s"Couldn't query reassignments through the AdminClient API: ${ex.getMessage}", ex) Map() case t => throw t } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index fdf110cbcef..a1e49e46cb9 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.ClusterAuthorizationException import org.apache.kafka.common.errors.TopicExistsException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.ListenerName @@ -36,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.rules.TestName import org.junit.{After, Before, Rule, Test} +import org.mockito.Mockito import org.scalatest.Assertions.{fail, intercept} import scala.jdk.CollectionConverters._ @@ -802,4 +804,29 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)) } + @Test + def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = { + adminClient = Mockito.spy(adminClient) + topicService = AdminClientTopicService(adminClient) + + val result = AdminClientTestUtils.listPartitionReassignmentsResult( + new ClusterAuthorizationException("Unauthorized")) + + // Passing `null` here to help the compiler disambiguate the `doReturn` methods, + // compilation for scala 2.12 fails otherwise. + Mockito.doReturn(result, null).when(adminClient).listPartitionReassignments( + Set(new TopicPartition(testTopicName, 0)).asJava + ) + + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, 1.toShort)) + ).all().get() + waitForTopicCreated(testTopicName) + + val output = TestUtils.grabConsoleOutput( + topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName)))) + val rows = output.split("\n") + assertEquals(2, rows.size) + rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1") + } }