From 13e265ab3dfd69cdc3709b6f871418bcf1a2221f Mon Sep 17 00:00:00 2001 From: Ivan Yurchenko Date: Tue, 26 Mar 2019 03:50:12 +0200 Subject: [PATCH] KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (#6493) A broken can have more than one instance of ZooKeeperClient. For example, SimpleAclAuthorizer creates a separate ZooKeeperClient instance when configured. This commit makes it possible to optionally specify the name for the ZooKeeperClient instance. The name is specified only for a broker's ZooKeeperClient instances, but not for commands' and tests'. Reviewers: Jun Rao --- .../security/auth/SimpleAclAuthorizer.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../main/scala/kafka/zk/KafkaZkClient.scala | 5 +++-- .../kafka/zookeeper/ZooKeeperClient.scala | 20 +++++++++++++++++-- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 8a0b4a072e4..e39babf8924 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -98,7 +98,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val time = Time.SYSTEM zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs, - zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") + zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer", name=Some("Simple ACL authorizer")) zkClient.createAclPaths() extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1 diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 8fc5197b7e1..b5ee8cc7907 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -359,7 +359,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP def createZkClient(zkConnect: String, isSecure: Boolean) = KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - config.zkMaxInFlightRequests, time) + config.zkMaxInFlightRequests, time, name = Some("Kafka server")) val chrootIndex = config.zkConnect.indexOf("/") val chrootOption = { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6d8d50443e9..782ec2ab990 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1820,9 +1820,10 @@ object KafkaZkClient { maxInFlightRequests: Int, time: Time, metricGroup: String = "kafka.server", - metricType: String = "SessionExpireListener") = { + metricType: String = "SessionExpireListener", + name: Option[String] = None) = { val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, - time, metricGroup, metricType) + time, metricGroup, metricType, name) new KafkaZkClient(zooKeeperClient, isSecure, time) } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index ad4da8b5385..c193ff2fbf2 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -44,6 +44,7 @@ import scala.collection.mutable.Set * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. + * @param name name of the client instance */ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, @@ -51,8 +52,23 @@ class ZooKeeperClient(connectString: String, maxInFlightRequests: Int, time: Time, metricGroup: String, - metricType: String) extends Logging with KafkaMetricsGroup { - this.logIdent = "[ZooKeeperClient] " + metricType: String, + name: Option[String]) extends Logging with KafkaMetricsGroup { + + def this(connectString: String, + sessionTimeoutMs: Int, + connectionTimeoutMs: Int, + maxInFlightRequests: Int, + time: Time, + metricGroup: String, + metricType: String) = { + this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None) + } + + this.logIdent = name match { + case Some(n) => s"[ZooKeeperClient $n] " + case _ => "[ZooKeeperClient] " + } private val initializationLock = new ReentrantReadWriteLock() private val isConnectedOrExpiredLock = new ReentrantLock() private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()