Browse Source

KAFKA-8837: KafkaMetricReporterClusterIdTest may not shutdown ZooKeeperTestHarness (#7255)

- Call `assertNoNonDaemonThreads` in test method instead of tear down method
to avoid situation where parent's class tear down is not invoked.
- Pass the thread prefix in tests that call `assertNoNonDaemonThreads` so that it
works correctly.
- Rename `verifyNonDaemonThreadsStatus` to `assertNoNonDaemonThreads` to
make it clear that it may throw.

Reviewers: Anna Povzner <anna@confluent.io>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
pull/7266/head
Anastasia Vela 5 years ago committed by Ismael Juma
parent
commit
d32a2d1275
  1. 12
      core/src/main/scala/kafka/server/KafkaServerStartable.scala
  2. 6
      core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
  3. 2
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  4. 24
      core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
  5. 26
      core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
  6. 14
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

12
core/src/main/scala/kafka/server/KafkaServerStartable.scala

@ -25,14 +25,18 @@ import kafka.utils.{Exit, Logging, VerifiableProperties} @@ -25,14 +25,18 @@ import kafka.utils.{Exit, Logging, VerifiableProperties}
import scala.collection.Seq
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
def fromProps(serverProps: Properties): KafkaServerStartable = {
fromProps(serverProps, None)
}
def fromProps(serverProps: Properties, threadNamePrefix: Option[String]): KafkaServerStartable = {
val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters, threadNamePrefix)
}
}
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)

6
core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala

@ -88,7 +88,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { @@ -88,7 +88,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
props.setProperty(KafkaConfig.BrokerIdProp, "-1")
config = KafkaConfig.fromProps(props)
server = KafkaServerStartable.fromProps(props)
server = KafkaServerStartable.fromProps(props, threadNamePrefix = Option(this.getClass.getName))
server.startup()
}
@ -104,13 +104,15 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { @@ -104,13 +104,15 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
assertEquals(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId(),
KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
server.shutdown()
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@After
override def tearDown(): Unit = {
server.shutdown()
CoreUtils.delete(config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
super.tearDown()
}
}

2
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -144,7 +144,7 @@ class ReplicaManagerTest { @@ -144,7 +144,7 @@ class ReplicaManagerTest {
rm.shutdown(checkpointHW = false)
}
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test

24
core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala

@ -59,11 +59,11 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -59,11 +59,11 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
// restart the server check to see if it uses the brokerId generated previously
server1 = TestUtils.createServer(config1)
server1 = TestUtils.createServer(config1, threadNamePrefix = Option(this.getClass.getName))
servers = Seq(server1)
assertEquals(server1.config.brokerId, 1001)
server1.shutdown()
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -72,7 +72,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -72,7 +72,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
val server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
val server2 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName))
val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
val server3 = new KafkaServer(KafkaConfig.fromProps(props3))
val server3 = new KafkaServer(KafkaConfig.fromProps(props3), threadNamePrefix = Option(this.getClass.getName))
server1.startup()
assertEquals(server1.config.brokerId, 1001)
server2.startup()
@ -84,7 +84,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -84,7 +84,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
assertTrue(verifyBrokerMetadata(server1.config.logDirs, 1001))
assertTrue(verifyBrokerMetadata(server2.config.logDirs, 0))
assertTrue(verifyBrokerMetadata(server3.config.logDirs, 1002))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -94,12 +94,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -94,12 +94,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting
props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
val config3 = KafkaConfig.fromProps(props3)
val server3 = TestUtils.createServer(config3)
val server3 = TestUtils.createServer(config3, threadNamePrefix = Option(this.getClass.getName))
servers = Seq(server3)
assertEquals(server3.config.brokerId, 3)
server3.shutdown()
assertTrue(verifyBrokerMetadata(server3.config.logDirs, 3))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -123,7 +123,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -123,7 +123,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
servers = Seq(server1)
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -140,7 +140,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -140,7 +140,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
case _: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -148,12 +148,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -148,12 +148,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// Start a good server
val propsA = TestUtils.createBrokerConfig(1, zkConnect)
val configA = KafkaConfig.fromProps(propsA)
val serverA = TestUtils.createServer(configA)
val serverA = TestUtils.createServer(configA, threadNamePrefix = Option(this.getClass.getName))
// Start a server that collides on the broker id
val propsB = TestUtils.createBrokerConfig(1, zkConnect)
val configB = KafkaConfig.fromProps(propsB)
val serverB = new KafkaServer(configB)
val serverB = new KafkaServer(configB, threadNamePrefix = Option(this.getClass.getName))
intercept[NodeExistsException] {
serverB.startup()
}
@ -168,7 +168,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -168,7 +168,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// adjust the broker config and start again
propsB.setProperty(KafkaConfig.BrokerIdProp, "2")
val newConfigB = KafkaConfig.fromProps(propsB)
val newServerB = TestUtils.createServer(newConfigB)
val newServerB = TestUtils.createServer(newConfigB, threadNamePrefix = Option(this.getClass.getName))
servers = Seq(serverA, newServerB)
serverA.shutdown()
@ -177,7 +177,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { @@ -177,7 +177,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// verify correct broker metadata was written
assertTrue(verifyBrokerMetadata(serverA.config.logDirs, 1))
assertTrue(verifyBrokerMetadata(newServerB.config.logDirs, 2))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {

26
core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala

@ -59,7 +59,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -59,7 +59,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
// Make sure that the cluster id doesn't exist yet.
assertFalse(zkClient.getClusterId.isDefined)
var server1 = TestUtils.createServer(config1)
var server1 = TestUtils.createServer(config1, threadNamePrefix = Option(this.getClass.getName))
servers = Seq(server1)
// Validate the cluster id
@ -73,7 +73,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -73,7 +73,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
// Restart the server check to confirm that it uses the clusterId generated previously
server1 = TestUtils.createServer(config1)
server1 = TestUtils.createServer(config1, threadNamePrefix = Option(this.getClass.getName))
servers = Seq(server1)
val clusterIdOnSecondBoot = server1.clusterId
@ -85,18 +85,18 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -85,18 +85,18 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getClusterId.isDefined)
assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testAutoGenerateClusterIdForKafkaClusterSequential(): Unit = {
val server1 = TestUtils.createServer(config1)
val server1 = TestUtils.createServer(config1, threadNamePrefix = Option(this.getClass.getName))
val clusterIdFromServer1 = server1.clusterId
val server2 = TestUtils.createServer(config2)
val server2 = TestUtils.createServer(config2, threadNamePrefix = Option(this.getClass.getName))
val clusterIdFromServer2 = server2.clusterId
val server3 = TestUtils.createServer(config3)
val server3 = TestUtils.createServer(config3, threadNamePrefix = Option(this.getClass.getName))
val clusterIdFromServer3 = server3.clusterId
servers = Seq(server1, server2, server3)
@ -115,12 +115,12 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -115,12 +115,12 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
servers.foreach(_.shutdown())
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testAutoGenerateClusterIdForKafkaClusterParallel(): Unit = {
val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config)))
val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config, threadNamePrefix = Option(this.getClass.getName))))
servers = Await.result(firstBoot, 100 second)
val Seq(server1, server2, server3) = servers
@ -142,13 +142,13 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -142,13 +142,13 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
servers.foreach(_.shutdown())
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testConsistentClusterIdFromZookeeperAndFromMetaProps() = {
// Check at the first boot
val server = TestUtils.createServer(config1)
val server = TestUtils.createServer(config1, threadNamePrefix = Option(this.getClass.getName))
val clusterId = server.clusterId
assertTrue(verifyBrokerMetadata(server.config.logDirs, clusterId))
@ -163,7 +163,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -163,7 +163,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server.shutdown()
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -179,7 +179,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -179,7 +179,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server.shutdown()
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
@ -205,7 +205,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { @@ -205,7 +205,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
server.shutdown()
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
def forgeBrokerMetadata(logDirs: Seq[String], brokerId: Int, clusterId: String): Unit = {

14
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -143,7 +143,15 @@ object TestUtils extends Logging { @@ -143,7 +143,15 @@ object TestUtils extends Logging {
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
val server = new KafkaServer(config, time)
createServer(config, time, None)
}
def createServer(config: KafkaConfig, threadNamePrefix: Option[String]): KafkaServer = {
createServer(config, Time.SYSTEM, threadNamePrefix)
}
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String]): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix = threadNamePrefix)
server.startup()
server
}
@ -998,7 +1006,9 @@ object TestUtils extends Logging { @@ -998,7 +1006,9 @@ object TestUtils extends Logging {
"Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get))
}
def verifyNonDaemonThreadsStatus(threadNamePrefix: String): Unit = {
// Note: Call this method in the test itself, rather than the @After method.
// Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed.
def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
val threadCount = Thread.getAllStackTraces.keySet.asScala.count { t =>
!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
}

Loading…
Cancel
Save