diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 11d22fd396a..645080f7641 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -195,7 +195,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = { this.brokerInfo = newBrokerInfo - zkClient.updateBrokerInfoInZk(newBrokerInfo) + zkClient.updateBrokerInfo(newBrokerInfo) } private[kafka] def enableDefaultUncleanLeaderElection(): Unit = { @@ -1208,7 +1208,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } try { - zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp)) + zkClient.registerController(config.brokerId, timestamp) info(s"${config.brokerId} successfully elected as the controller") activeControllerId = config.brokerId onControllerFailover() @@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti override def state: ControllerState = ControllerState.ControllerChange override def process(): Unit = { - zkClient.registerBrokerInZk(brokerInfo) + zkClient.registerBroker(brokerInfo) Reelect.process() } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f73ede619dd..6c1bb83ba4d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -254,7 +254,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP replicaManager.startup() val brokerInfo = createBrokerInfo - zkClient.registerBrokerInZk(brokerInfo) + zkClient.registerBroker(brokerInfo) // Now that the broker id is successfully registered, checkpoint it checkpointBrokerId(config.brokerId) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index ec4932ab47b..c45a90f6076 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -79,13 +79,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean createResponse.name } - def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = { + def registerBroker(brokerInfo: BrokerInfo): Unit = { val path = brokerInfo.path checkedEphemeralCreate(path, brokerInfo.toJsonBytes) info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}") } - def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { + /** + * Registers a given broker in zookeeper as the controller. + * @param controllerId the id of the broker that is to be registered as the controller. + * @param timestamp the timestamp of the controller election. + * @throws KeeperException if an error is returned by ZooKeeper. + */ + def registerController(controllerId: Int, timestamp: Long): Unit = { + val path = ControllerZNode.path + checkedEphemeralCreate(path, ControllerZNode.encode(controllerId, timestamp)) + } + + def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) val response = retryRequestUntilConnected(setDataRequest) @@ -1509,7 +1520,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean responses } - def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = { + private def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = { val checkedEphemeral = new CheckedEphemeral(path, data) info(s"Creating $path (is it secure? $isSecure)") val code = checkedEphemeral.create() diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 2e8179ce44c..cb261f62c7d 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -687,7 +687,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion, jmxPort = 9192) - zkClient.registerBrokerInZk(brokerInfo) + zkClient.registerBroker(brokerInfo) } class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f89abb9f063..cf60c780d39 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -674,7 +674,7 @@ object TestUtils extends Logging { val listenerName = ListenerName.forSecurityProtocol(protocol) Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack) } - brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack), + brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack), ApiVersion.latestVersion, jmxPort = -1))) brokers } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index cc67a010bc4..df009e8136c 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -630,17 +630,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT) val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL) - zkClient.registerBrokerInZk(brokerInfo) + zkClient.registerBroker(brokerInfo) assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1)) // Node exists, owned by current session - no error, no update - zkClient.registerBrokerInZk(differentBrokerInfoWithSameId) + zkClient.registerBroker(differentBrokerInfoWithSameId) assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) // Other client tries to register broker with same id causes failure, info is not changed in ZK intercept[NodeExistsException] { - otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId) + otherZkClient.registerBroker(differentBrokerInfoWithSameId) } assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1)) } @@ -656,8 +656,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT) val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL) - zkClient.registerBrokerInZk(brokerInfo1) - otherZkClient.registerBrokerInZk(brokerInfo0) + zkClient.registerBroker(brokerInfo1) + otherZkClient.registerBroker(brokerInfo0) assertEquals(Seq(0, 1), zkClient.getSortedBrokerList()) assertEquals( @@ -674,17 +674,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // Updating info of a broker not existing in ZK fails val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT) intercept[NoNodeException]{ - zkClient.updateBrokerInfoInZk(originalBrokerInfo) + zkClient.updateBrokerInfo(originalBrokerInfo) } - zkClient.registerBrokerInZk(originalBrokerInfo) + zkClient.registerBroker(originalBrokerInfo) val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL) - zkClient.updateBrokerInfoInZk(updatedBrokerInfo) + zkClient.updateBrokerInfo(updatedBrokerInfo) assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1)) // Other ZK clients can update info - otherZkClient.updateBrokerInfoInZk(originalBrokerInfo) + otherZkClient.updateBrokerInfo(originalBrokerInfo) assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1)) } @@ -937,7 +937,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // No controller assertEquals(None, zkClient.getControllerId) // Create controller - zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456)) + zkClient.registerController(controllerId = 1, timestamp = 123456) assertEquals(Some(1), zkClient.getControllerId) zkClient.deleteController() assertEquals(None, zkClient.getControllerId)