Browse Source

KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (#6493)

A broken can have more than one instance of ZooKeeperClient. For example, SimpleAclAuthorizer creates a separate ZooKeeperClient instance when configured.

This commit makes it possible to optionally specify the name for the ZooKeeperClient instance. The name is specified only for a broker's ZooKeeperClient instances, but not for commands' and tests'.

Reviewers: Jun Rao <junrao@gmail.com>
pull/6505/head
Ivan Yurchenko 6 years ago committed by Jun Rao
parent
commit
13e265ab3d
  1. 2
      core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
  2. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  3. 5
      core/src/main/scala/kafka/zk/KafkaZkClient.scala
  4. 20
      core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala

2
core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala

@ -98,7 +98,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { @@ -98,7 +98,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer", name=Some("Simple ACL authorizer"))
zkClient.createAclPaths()
extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -359,7 +359,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -359,7 +359,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
def createZkClient(zkConnect: String, isSecure: Boolean) =
KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
config.zkMaxInFlightRequests, time)
config.zkMaxInFlightRequests, time, name = Some("Kafka server"))
val chrootIndex = config.zkConnect.indexOf("/")
val chrootOption = {

5
core/src/main/scala/kafka/zk/KafkaZkClient.scala

@ -1820,9 +1820,10 @@ object KafkaZkClient { @@ -1820,9 +1820,10 @@ object KafkaZkClient {
maxInFlightRequests: Int,
time: Time,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener") = {
metricType: String = "SessionExpireListener",
name: Option[String] = None) = {
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType)
time, metricGroup, metricType, name)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}

20
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala

@ -44,6 +44,7 @@ import scala.collection.mutable.Set @@ -44,6 +44,7 @@ import scala.collection.mutable.Set
* @param sessionTimeoutMs session timeout in milliseconds
* @param connectionTimeoutMs connection timeout in milliseconds
* @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
* @param name name of the client instance
*/
class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
@ -51,8 +52,23 @@ class ZooKeeperClient(connectString: String, @@ -51,8 +52,23 @@ class ZooKeeperClient(connectString: String,
maxInFlightRequests: Int,
time: Time,
metricGroup: String,
metricType: String) extends Logging with KafkaMetricsGroup {
this.logIdent = "[ZooKeeperClient] "
metricType: String,
name: Option[String]) extends Logging with KafkaMetricsGroup {
def this(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
metricGroup: String,
metricType: String) = {
this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None)
}
this.logIdent = name match {
case Some(n) => s"[ZooKeeperClient $n] "
case _ => "[ZooKeeperClient] "
}
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()

Loading…
Cancel
Save