Browse Source

KAFKA-8962; Use least loaded node for AdminClient#describeTopics (#7421)

Allow routing of `AdminClient#describeTopics` to any broker in the cluster than just the controller, so that we don't create a hotspot for this API call. `AdminClient#describeTopics` uses the broker's metadata cache which is asynchronously maintained, so routing to brokers other than the controller is not expected to have a significant difference in terms of metadata consistency; all metadata requests are eventually consistent.

This patch also fixes a few flaky test failures.

Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/7556/head
Dhruvil Shah 5 years ago committed by Jason Gustafson
parent
commit
317089663c
  1. 2
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  2. 2
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 153
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
  4. 23
      core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
  5. 26
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
  6. 36
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

2
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -1537,7 +1537,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1537,7 +1537,7 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
new LeastLoadedNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;

2
core/src/main/scala/kafka/controller/KafkaController.scala

@ -790,7 +790,7 @@ class KafkaController(val config: KafkaConfig, @@ -790,7 +790,7 @@ class KafkaController(val config: KafkaConfig,
electionType: ElectionType,
electionTrigger: ElectionTrigger
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")} triggerd by $electionTrigger")
info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")} triggered by $electionTrigger")
try {
val strategy = electionType match {
case ElectionType.PREFERRED => PreferredReplicaPartitionLeaderElectionStrategy

153
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -37,10 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords @@ -37,10 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
@ -296,15 +293,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -296,15 +293,14 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List())
// without includeAuthorizedOperations flag
var topicResult = client.describeTopics(Seq(topic).asJava).values
assertEquals(Set().asJava, topicResult.get(topic).get().authorizedOperations())
var topicResult = getTopicMetadata(client, topic)
assertEquals(Set().asJava, topicResult.authorizedOperations)
//with includeAuthorizedOperations flag
topicResult = client.describeTopics(Seq(topic).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).values
topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true))
expectedOperations = Topic.supportedOperations
.map(operation => operation.toJava).asJava
assertEquals(expectedOperations, topicResult.get(topic).get().authorizedOperations())
assertEquals(expectedOperations, topicResult.authorizedOperations)
}
def configuredClusterPermissions() : Set[AclOperation] = {
@ -559,17 +555,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -559,17 +555,19 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
createTopic(topic2, numPartitions = 1, replicationFactor = 2)
// assert that both the topics have 1 partition
assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
val topic1_metadata = getTopicMetadata(client, topic1)
val topic2_metadata = getTopicMetadata(client, topic2)
assertEquals(1, topic1_metadata.partitions.size)
assertEquals(1, topic2_metadata.partitions.size)
val validateOnly = new CreatePartitionsOptions().validateOnly(true)
val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
def partitions(topic: String) =
client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions
def partitions(topic: String, expectedNumPartitionsOpt: Option[Int] = None): util.List[TopicPartitionInfo] = {
getTopicMetadata(client, topic, expectedNumPartitionsOpt = expectedNumPartitionsOpt).partitions
}
def numPartitions(topic: String) =
partitions(topic).size
def numPartitions(topic: String): Int = partitions(topic).size
// validateOnly: try creating a new partition (no assignments), to bring the total to 3 partitions
var alterResult = client.createPartitions(Map(topic1 ->
@ -581,7 +579,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -581,7 +579,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
alterResult = client.createPartitions(Map(topic1 ->
NewPartitions.increaseTo(3)).asJava, actuallyDoIt)
altered = alterResult.values.get(topic1).get
assertEquals(3, numPartitions(topic1))
TestUtils.waitUntilTrue(() => numPartitions(topic1) == 3, "Timed out waiting for new partitions to appear")
// validateOnly: now try creating a new partition (with assignments), to bring the total to 3 partitions
val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), asList(1, 2))
@ -594,7 +592,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -594,7 +592,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
alterResult = client.createPartitions(Map(topic2 ->
NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, actuallyDoIt)
altered = alterResult.values.get(topic2).get
val actualPartitions2 = partitions(topic2)
val actualPartitions2 = partitions(topic2, expectedNumPartitionsOpt = Some(3))
assertEquals(3, actualPartitions2.size)
assertEquals(Seq(0, 1), actualPartitions2.get(1).replicas.asScala.map(_.id).toList)
assertEquals(Seq(1, 2), actualPartitions2.get(2).replicas.asScala.map(_.id).toList)
@ -782,7 +780,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -782,7 +780,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt)
// assert that the topic1 now has 4 partitions
altered = alterResult.values.get(topic1).get
assertEquals(4, numPartitions(topic1))
TestUtils.waitUntilTrue(() => numPartitions(topic1) == 4, "Timed out waiting for new partitions to appear")
try {
altered = alterResult.values.get(topic2).get
} catch {
@ -1452,15 +1450,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1452,15 +1450,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers)
def preferredLeader(topicPartition: TopicPartition) =
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
get.partitions.get(topicPartition.partition).replicas.get(0).id
def preferredLeader(topicPartition: TopicPartition): Int = {
val partitionMetadata = getTopicMetadata(client, topicPartition.topic).partitions.get(topicPartition.partition)
val preferredLeaderMetadata = partitionMetadata.replicas.get(0)
preferredLeaderMetadata.id
}
/** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */
def changePreferredLeader(newAssignment: Seq[Int]) = {
val preferred = newAssignment.head
val prior1 = TestUtils.currentLeader(client, partition1).get
val prior2 = TestUtils.currentLeader(client, partition2).get
val prior1 = zkClient.getLeaderForPartition(partition1).get
val prior2 = zkClient.getLeaderForPartition(partition2).get
var m = Map.empty[TopicPartition, Seq[Int]]
@ -1475,26 +1475,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1475,26 +1475,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
10000)
// Check the leader hasn't moved
assertEquals(Some(prior1), TestUtils.currentLeader(client, partition1))
assertEquals(Some(prior2), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, prior1)
TestUtils.assertLeader(client, partition2, prior2)
}
// Check current leaders are 0
assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, 0)
TestUtils.assertLeader(client, partition2, 0)
// Noop election
var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
var exception = electResult.partitions.get.get(partition1).get
assertEquals(classOf[ElectionNotNeededException], exception.getClass)
assertEquals("Leader election not needed for topic partition", exception.getMessage)
assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
TestUtils.assertLeader(client, partition1, 0)
// Noop election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertTrue(electResult.partitions.get.isEmpty)
assertEquals(Some(0), TestUtils.currentLeader(client, partition1))
assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, 0)
TestUtils.assertLeader(client, partition2, 0)
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
@ -1503,17 +1503,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1503,17 +1503,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
TestUtils.waitForLeaderToBecome(client, partition1, Some(1))
TestUtils.assertLeader(client, partition1, 1)
// topic 2 unchanged
assertFalse(electResult.partitions.get.containsKey(partition2))
assertEquals(Some(0), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition2, 0)
// meaningful election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
TestUtils.waitForLeaderToBecome(client, partition2, Some(1))
TestUtils.assertLeader(client, partition2, 1)
// unknown topic
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@ -1522,8 +1522,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1522,8 +1522,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
assertEquals(Some(1), TestUtils.currentLeader(client, partition1))
assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, 1)
TestUtils.assertLeader(client, partition2, 1)
// Now change the preferred leader to 2
changePreferredLeader(prefer2)
@ -1531,8 +1531,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1531,8 +1531,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// mixed results
electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
TestUtils.waitForLeaderToBecome(client, partition1, Some(2))
assertEquals(Some(1), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, 2)
TestUtils.assertLeader(client, partition2, 1)
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
@ -1541,7 +1541,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1541,7 +1541,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
TestUtils.waitForLeaderToBecome(client, partition2, Some(2))
TestUtils.assertLeader(client, partition2, 2)
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
@ -1557,7 +1557,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1557,7 +1557,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
TestUtils.assertLeader(client, partition1, 2)
// preferred leader unavailable with null argument
electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
@ -1572,8 +1572,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1572,8 +1572,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
assertEquals(Some(2), TestUtils.currentLeader(client, partition1))
assertEquals(Some(2), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, 2)
TestUtils.assertLeader(client, partition2, 2)
}
@Test
@ -1588,17 +1588,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1588,17 +1588,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val partition1 = new TopicPartition("unclean-test-topic-1", 0)
TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1), servers)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, None)
TestUtils.assertNoLeader(client, partition1)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
TestUtils.assertLeader(client, partition1, broker2)
}
@Test
@ -1622,21 +1622,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1622,21 +1622,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, None)
TestUtils.waitForLeaderToBecome(client, partition2, None)
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertNoLeader(client, partition2)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
assertEquals(Option(broker2), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, broker2)
TestUtils.assertLeader(client, partition2, broker2)
}
@Test
@ -1661,21 +1661,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1661,21 +1661,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, None)
TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertFalse(electResult.partitions.get.containsKey(partition2))
assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, broker2)
TestUtils.assertLeader(client, partition2, broker3)
}
@Test
@ -1698,7 +1698,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1698,7 +1698,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1)
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(unknownPartition, unknownTopic).asJava)
assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
@ -1724,12 +1724,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1724,12 +1724,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, None)
TestUtils.assertNoLeader(client, partition1)
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
@ -1754,10 +1754,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1754,10 +1754,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, Some(broker2))
TestUtils.assertLeader(client, partition1, broker2)
servers(broker1).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
@ -1786,21 +1786,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1786,21 +1786,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
servers
)
TestUtils.waitForLeaderToBecome(client, partition1, Option(broker1))
TestUtils.waitForLeaderToBecome(client, partition2, Option(broker1))
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
servers(broker2).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
servers(broker1).shutdown()
TestUtils.waitForLeaderToBecome(client, partition1, None)
TestUtils.waitForLeaderToBecome(client, partition2, Some(broker3))
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
servers(broker2).startup()
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
assertEquals(Option(broker2), TestUtils.currentLeader(client, partition1))
assertEquals(Option(broker3), TestUtils.currentLeader(client, partition2))
TestUtils.assertLeader(client, partition1, broker2)
TestUtils.assertLeader(client, partition2, broker3)
}
@Test
@ -2428,4 +2428,23 @@ object AdminClientIntegrationTest { @@ -2428,4 +2428,23 @@ object AdminClientIntegrationTest {
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
private def getTopicMetadata(client: Admin,
topic: String,
describeOptions: DescribeTopicsOptions = new DescribeTopicsOptions,
expectedNumPartitionsOpt: Option[Int] = None): TopicDescription = {
var result: TopicDescription = null
TestUtils.waitUntilTrue(() => {
val topicResult = client.describeTopics(Set(topic).asJava, describeOptions).values.get(topic)
try {
result = topicResult.get
expectedNumPartitionsOpt.map(_ == result.partitions.size).getOrElse(true)
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false // metadata may not have propagated yet, so retry
}
}, s"Timed out waiting for metadata for $topic")
result
}
}

23
core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala

@ -24,7 +24,7 @@ import kafka.utils.TestUtils._ @@ -24,7 +24,7 @@ import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.{assertEquals, assertTrue}
@ -33,6 +33,7 @@ import org.junit.{After, Assert, Before, Test} @@ -33,6 +33,7 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
@ -448,8 +449,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with @@ -448,8 +449,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
validateMetadataAndConfigs(createResult)
val createResponseConfig = createResult.config(topic1).get().entries.asScala
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
val describeResponseConfig = client.describeConfigs(List(topicResource).asJava).values.get(topicResource).get().entries.asScala
val describeResponseConfig = describeConfigs(topic1)
assertEquals(describeResponseConfig.map(_.name).toSet, createResponseConfig.map(_.name).toSet)
describeResponseConfig.foreach { describeEntry =>
val name = describeEntry.name
@ -461,6 +461,23 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with @@ -461,6 +461,23 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
}
}
private def describeConfigs(topic: String): Iterable[ConfigEntry] = {
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
var configEntries: Iterable[ConfigEntry] = null
TestUtils.waitUntilTrue(() => {
try {
val topicResponse = client.describeConfigs(List(topicResource).asJava).all.get.get(topicResource)
configEntries = topicResponse.entries.asScala
true
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
}
}, "Timed out waiting for describeConfigs")
configEntries
}
private def waitForDescribeAcls(client: Admin, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
var lastResults: util.Collection[AclBinding] = null
TestUtils.waitUntilTrue(() => {

26
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala

@ -80,12 +80,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -80,12 +80,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.waitForLeaderToBecome(client, topicPartition, None)
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
LeaderElectionCommand.main(
@ -96,7 +96,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -96,7 +96,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
)
)
assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@ -111,12 +111,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -111,12 +111,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.waitForLeaderToBecome(client, topicPartition, None)
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
LeaderElectionCommand.main(
@ -128,7 +128,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -128,7 +128,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
)
)
assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@ -143,12 +143,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -143,12 +143,12 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker3).shutdown()
TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
servers(broker2).shutdown()
TestUtils.waitForLeaderToBecome(client, topicPartition, None)
TestUtils.assertNoLeader(client, topicPartition)
servers(broker3).startup()
val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
@ -161,7 +161,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -161,7 +161,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
)
)
assertEquals(Option(broker3), TestUtils.currentLeader(client, topicPartition))
TestUtils.assertLeader(client, topicPartition, broker3)
}
}
@ -176,10 +176,10 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -176,10 +176,10 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, partition)
TestUtils.waitForLeaderToBecome(client, topicPartition, Option(broker2))
TestUtils.assertLeader(client, topicPartition, broker2)
servers(broker2).shutdown()
TestUtils.waitForLeaderToBecome(client, topicPartition, Some(broker3))
TestUtils.assertLeader(client, topicPartition, broker3)
servers(broker2).startup()
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
@ -191,7 +191,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -191,7 +191,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
)
)
assertEquals(Option(broker2), TestUtils.currentLeader(client, topicPartition))
TestUtils.assertLeader(client, topicPartition, broker2)
}
}
@ -273,7 +273,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { @@ -273,7 +273,7 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
LeaderElectionCommand.main(
Array(
"--bootstrap-server", bootstrapServers(servers),
"--election-type", "preferrred"
"--election-type", "preferred"
)
)
fail()

36
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -28,8 +28,8 @@ import java.util.Arrays @@ -28,8 +28,8 @@ import java.util.Arrays
import java.util.Collections
import java.util.Properties
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.log._
@ -49,6 +49,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce @@ -49,6 +49,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBindingFilter}
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
@ -1488,24 +1489,27 @@ object TestUtils extends Logging { @@ -1488,24 +1489,27 @@ object TestUtils extends Logging {
adminClient.alterConfigs(configs)
}
def currentLeader(client: Admin, topicPartition: TopicPartition): Option[Int] = {
Option(
client
.describeTopics(Arrays.asList(topicPartition.topic))
.all
.get
.get(topicPartition.topic)
.partitions
.get(topicPartition.partition)
.leader
).map(_.id)
def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit = {
waitForLeaderToBecome(client, topicPartition, Some(expectedLeader))
}
def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = {
waitForLeaderToBecome(client, topicPartition, None)
}
def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
TestUtils.waitUntilTrue(
() => currentLeader(client, topicPartition) == leader,
s"Expected leader to become $leader", 10000
)
val topic = topicPartition.topic
val partition = topicPartition.partition
TestUtils.waitUntilTrue(() => {
try {
val topicResult = client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
val partitionResult = topicResult.partitions.get(partition)
Option(partitionResult.leader).map(_.id) == leader
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
}
}, "Timed out waiting for leader metadata")
}
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {

Loading…
Cancel
Save