From 41189ea5601837bdb697ade31f55e244abbe6d1c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 23 Feb 2015 11:51:32 -0800 Subject: [PATCH] kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede --- .../scala/kafka/server/KafkaHealthcheck.scala | 7 +--- .../main/scala/kafka/server/KafkaServer.scala | 2 - core/src/main/scala/kafka/utils/ZkUtils.scala | 6 --- .../unit/kafka/server/ServerStartupTest.scala | 42 +++++++++++++------ 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4acdd70fe9c..7907987e434 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -39,17 +39,12 @@ class KafkaHealthcheck(private val brokerId: Int, val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener - + def startup() { zkClient.subscribeStateChanges(sessionExpireListener) register() } - def shutdown() { - zkClient.unsubscribeStateChanges(sessionExpireListener) - ZkUtils.deregisterBrokerInZk(zkClient, brokerId) - } - /** * Register this broker as "alive" in zookeeper */ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7e5ddcb9be8..426e522fc98 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (canShutdown) { Utils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) - if(kafkaHealthcheck != null) - Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c78a1b6ff42..8a2fb2d9a42 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -189,12 +189,6 @@ object ZkUtils extends Logging { info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } - def deregisterBrokerInZk(zkClient: ZkClient, id: Int) { - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - deletePath(zkClient, brokerIdPath) - info("Deregistered broker %d at path %s.".format(id, brokerIdPath)) - } - def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { val topicDirs = new ZKGroupTopicDirs(group, topic) topicDirs.consumerOwnerDir + "/" + partition diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 764655a6883..93af7dfcec7 100644 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -26,26 +26,44 @@ import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { - var server : KafkaServer = null - val brokerId = 0 - val zookeeperChroot = "/kafka-chroot-for-unittest" - override def setUp() { - super.setUp() + def testBrokerCreatesZKChroot { + val brokerId = 0 + val zookeeperChroot = "/kafka-chroot-for-unittest" val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) val zooKeeperConnect = props.get("zookeeper.connect") props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) - server = TestUtils.createServer(new KafkaConfig(props)) - } + val server = TestUtils.createServer(new KafkaConfig(props)) + + val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot) + assertTrue(pathExists) - override def tearDown() { server.shutdown() Utils.rm(server.config.logDirs) - super.tearDown() } - def testBrokerCreatesZKChroot { - val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot) - assertTrue(pathExists) + def testConflictBrokerRegistration { + // Try starting a broker with the a conflicting broker id. + // This shouldn't affect the existing broker registration. + + val brokerId = 0 + val props1 = TestUtils.createBrokerConfig(brokerId) + val server1 = TestUtils.createServer(new KafkaConfig(props1)) + val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 + + val props2 = TestUtils.createBrokerConfig(brokerId) + try { + TestUtils.createServer(new KafkaConfig(props2)) + fail("Registering a broker with a conflicting id should fail") + } catch { + case e : RuntimeException => + // this is expected + } + + // broker registration shouldn't change + assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1) + + server1.shutdown() + Utils.rm(server1.config.logDirs) } } \ No newline at end of file