Browse Source

MINOR: Add registerController method to KafkaZkClient (#4598)

And change KafkaController to use the newly introduced method.
Also remove redundant `InZk` postfixes from `registerBrokerInZk` and
`updateBrokerInfoInZk`.

As `checkedEphemeralCreate` is not used outside of `KafkaZkClient`
any longer, reduce its visibility.

ControllerIntegrationTest already covers this functionality well, it validates the
refactor.

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/5412/merge
Sandor Murakozi 6 years ago committed by Ismael Juma
parent
commit
591954e2e5
  1. 6
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  3. 17
      core/src/main/scala/kafka/zk/KafkaZkClient.scala
  4. 2
      core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
  5. 2
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  6. 20
      core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala

6
core/src/main/scala/kafka/controller/KafkaController.scala

@ -195,7 +195,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -195,7 +195,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
this.brokerInfo = newBrokerInfo
zkClient.updateBrokerInfoInZk(newBrokerInfo)
zkClient.updateBrokerInfo(newBrokerInfo)
}
private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
@ -1208,7 +1208,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -1208,7 +1208,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
try {
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
zkClient.registerController(config.brokerId, timestamp)
info(s"${config.brokerId} successfully elected as the controller")
activeControllerId = config.brokerId
onControllerFailover()
@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
override def state: ControllerState = ControllerState.ControllerChange
override def process(): Unit = {
zkClient.registerBrokerInZk(brokerInfo)
zkClient.registerBroker(brokerInfo)
Reelect.process()
}
}

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -254,7 +254,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -254,7 +254,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
replicaManager.startup()
val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)
zkClient.registerBroker(brokerInfo)
// Now that the broker id is successfully registered, checkpoint it
checkpointBrokerId(config.brokerId)

17
core/src/main/scala/kafka/zk/KafkaZkClient.scala

@ -79,13 +79,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean @@ -79,13 +79,24 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
createResponse.name
}
def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
def registerBroker(brokerInfo: BrokerInfo): Unit = {
val path = brokerInfo.path
checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
}
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
/**
* Registers a given broker in zookeeper as the controller.
* @param controllerId the id of the broker that is to be registered as the controller.
* @param timestamp the timestamp of the controller election.
* @throws KeeperException if an error is returned by ZooKeeper.
*/
def registerController(controllerId: Int, timestamp: Long): Unit = {
val path = ControllerZNode.path
checkedEphemeralCreate(path, ControllerZNode.encode(controllerId, timestamp))
}
def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
@ -1509,7 +1520,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean @@ -1509,7 +1520,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
responses
}
def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
private def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
val checkedEphemeral = new CheckedEphemeral(path, data)
info(s"Creating $path (is it secure? $isSecure)")
val code = checkedEphemeral.create()

2
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala

@ -687,7 +687,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @@ -687,7 +687,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val securityProtocol = SecurityProtocol.PLAINTEXT
val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion, jmxPort = 9192)
zkClient.registerBrokerInZk(brokerInfo)
zkClient.registerBroker(brokerInfo)
}
class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {

2
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -674,7 +674,7 @@ object TestUtils extends Logging { @@ -674,7 +674,7 @@ object TestUtils extends Logging {
val listenerName = ListenerName.forSecurityProtocol(protocol)
Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack)
}
brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack),
brokers.foreach(b => zkClient.registerBroker(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack),
ApiVersion.latestVersion, jmxPort = -1)))
brokers
}

20
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala

@ -630,17 +630,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -630,17 +630,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo)
zkClient.registerBroker(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
// Node exists, owned by current session - no error, no update
zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
zkClient.registerBroker(differentBrokerInfoWithSameId)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
// Other client tries to register broker with same id causes failure, info is not changed in ZK
intercept[NodeExistsException] {
otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
otherZkClient.registerBroker(differentBrokerInfoWithSameId)
}
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
}
@ -656,8 +656,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -656,8 +656,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo1)
otherZkClient.registerBrokerInZk(brokerInfo0)
zkClient.registerBroker(brokerInfo1)
otherZkClient.registerBroker(brokerInfo0)
assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
assertEquals(
@ -674,17 +674,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -674,17 +674,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// Updating info of a broker not existing in ZK fails
val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
intercept[NoNodeException]{
zkClient.updateBrokerInfoInZk(originalBrokerInfo)
zkClient.updateBrokerInfo(originalBrokerInfo)
}
zkClient.registerBrokerInZk(originalBrokerInfo)
zkClient.registerBroker(originalBrokerInfo)
val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
zkClient.updateBrokerInfo(updatedBrokerInfo)
assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
// Other ZK clients can update info
otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
otherZkClient.updateBrokerInfo(originalBrokerInfo)
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
@ -937,7 +937,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -937,7 +937,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
zkClient.registerController(controllerId = 1, timestamp = 123456)
assertEquals(Some(1), zkClient.getControllerId)
zkClient.deleteController()
assertEquals(None, zkClient.getControllerId)

Loading…
Cancel
Save