Browse Source

kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede

pull/51/head
Jun Rao 10 years ago
parent
commit
41189ea560
  1. 7
      core/src/main/scala/kafka/server/KafkaHealthcheck.scala
  2. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  3. 6
      core/src/main/scala/kafka/utils/ZkUtils.scala
  4. 42
      core/src/test/scala/unit/kafka/server/ServerStartupTest.scala

7
core/src/main/scala/kafka/server/KafkaHealthcheck.scala

@ -39,17 +39,12 @@ class KafkaHealthcheck(private val brokerId: Int,
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener val sessionExpireListener = new SessionExpireListener
def startup() { def startup() {
zkClient.subscribeStateChanges(sessionExpireListener) zkClient.subscribeStateChanges(sessionExpireListener)
register() register()
} }
def shutdown() {
zkClient.unsubscribeStateChanges(sessionExpireListener)
ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
}
/** /**
* Register this broker as "alive" in zookeeper * Register this broker as "alive" in zookeeper
*/ */

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if (canShutdown) { if (canShutdown) {
Utils.swallow(controlledShutdown()) Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown) brokerState.newState(BrokerShuttingDown)
if(kafkaHealthcheck != null)
Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null) if(socketServer != null)
Utils.swallow(socketServer.shutdown()) Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null) if(requestHandlerPool != null)

6
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -189,12 +189,6 @@ object ZkUtils extends Logging {
info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
} }
def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
deletePath(zkClient, brokerIdPath)
info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
}
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic) val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition topicDirs.consumerOwnerDir + "/" + partition

42
core/src/test/scala/unit/kafka/server/ServerStartupTest.scala

@ -26,26 +26,44 @@ import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._ import junit.framework.Assert._
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
var server : KafkaServer = null
val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
override def setUp() { def testBrokerCreatesZKChroot {
super.setUp() val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val zooKeeperConnect = props.get("zookeeper.connect") val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
server = TestUtils.createServer(new KafkaConfig(props)) val server = TestUtils.createServer(new KafkaConfig(props))
}
val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
assertTrue(pathExists)
override def tearDown() {
server.shutdown() server.shutdown()
Utils.rm(server.config.logDirs) Utils.rm(server.config.logDirs)
super.tearDown()
} }
def testBrokerCreatesZKChroot { def testConflictBrokerRegistration {
val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot) // Try starting a broker with the a conflicting broker id.
assertTrue(pathExists) // This shouldn't affect the existing broker registration.
val brokerId = 0
val props1 = TestUtils.createBrokerConfig(brokerId)
val server1 = TestUtils.createServer(new KafkaConfig(props1))
val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
val props2 = TestUtils.createBrokerConfig(brokerId)
try {
TestUtils.createServer(new KafkaConfig(props2))
fail("Registering a broker with a conflicting id should fail")
} catch {
case e : RuntimeException =>
// this is expected
}
// broker registration shouldn't change
assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
server1.shutdown()
Utils.rm(server1.config.logDirs)
} }
} }
Loading…
Cancel
Save