Browse Source

KAFKA-4708; Fix transient failure in BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2489 from cmccabe/KAFKA-4708
pull/2584/head
Colin P. Mccabe 8 years ago committed by Ismael Juma
parent
commit
913c09e4a9
  1. 14
      core/src/main/scala/kafka/admin/AdminClient.scala
  2. 4
      core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
  3. 2
      core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala

14
core/src/main/scala/kafka/admin/AdminClient.scala

@ -85,7 +85,19 @@ class AdminClient(val time: Time, @@ -85,7 +85,19 @@ class AdminClient(val time: Time,
response.apiVersions.asScala.toList
}
private def findAllBrokers(): List[Node] = {
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers() {
var nodes = List[Node]()
do {
nodes = findAllBrokers()
if (nodes.isEmpty)
Thread.sleep(50)
} while (nodes.isEmpty)
}
def findAllBrokers(): List[Node] = {
val request = MetadataRequest.Builder.allTopics()
val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
val errors = response.errors

4
core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala

@ -24,6 +24,7 @@ import kafka.utils.CommandLineUtils @@ -24,6 +24,7 @@ import kafka.utils.CommandLineUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
import org.apache.kafka.common.Node
import scala.util.{Failure, Success}
@ -39,7 +40,8 @@ object BrokerApiVersionsCommand { @@ -39,7 +40,8 @@ object BrokerApiVersionsCommand {
def execute(args: Array[String], out: PrintStream): Unit = {
val opts = new BrokerVersionCommandOptions(args)
val adminClient = createAdminClient(opts)
val brokerMap = adminClient.listAllBrokerVersionInfo()
adminClient.awaitBrokers()
var brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.foreach { case (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")

2
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala

@ -33,7 +33,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { @@ -33,7 +33,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
@Test
@Test(timeout=120000)
def checkBrokerApiVersionCommandOutput() {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream)

Loading…
Cancel
Save