Browse Source

MINOR: some minor cleanups for process startup

Move registration of linux-disk-read-bytes, linux-disk-write-bytes metrics into a common function
in LinuxIoMetricsCollector.scala. Also do it for isolated controller processes.

Create authorizers using AuthorizerUtils.configureAuthorizer and close them using
AuthorizerUtils.closeAuthorizer. This ensures that we configure the authorizers immediately after
creating them.
pull/13900/head
Colin P. McCabe 1 year ago
parent
commit
72737ab522
  1. 20
      core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
  2. 17
      core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
  3. 9
      core/src/main/scala/kafka/server/BrokerServer.scala
  4. 15
      core/src/main/scala/kafka/server/ControllerServer.scala
  5. 6
      core/src/main/scala/kafka/server/KafkaServer.scala
  6. 12
      core/src/main/scala/kafka/server/Server.scala

20
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala

@ -17,9 +17,11 @@ @@ -17,9 +17,11 @@
package kafka.metrics
import java.nio.file.{Files, Paths}
import com.yammer.metrics.core.{Gauge, MetricsRegistry}
import java.nio.file.{Files, Paths}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.slf4j.Logger
import scala.jdk.CollectionConverters._
@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg @@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg
false
}
}
def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {
def registerGauge(name: String, gauge: Gauge[Long]): Unit = {
val metricName = KafkaYammerMetrics.getMetricName(
"kafka.server",
"KafkaServer",
name
)
registry.newGauge(metricName, gauge)
}
if (usable()) {
registerGauge("linux-disk-read-bytes", () => readBytes())
registerGauge("linux-disk-write-bytes", () => writeBytes())
}
}
}
object LinuxIoMetricsCollector {

17
core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala

@ -18,20 +18,33 @@ @@ -18,20 +18,33 @@
package kafka.security.authorizer
import java.net.InetAddress
import kafka.network.RequestChannel.Session
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
object AuthorizerUtils {
object AuthorizerUtils extends Logging{
def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
def configureAuthorizer(config: KafkaConfig): Option[Authorizer] = {
val authorizerOpt = config.createNewAuthorizer()
authorizerOpt.foreach { authorizer =>
authorizer.configure(config.originals)
}
authorizerOpt
}
def closeAuthorizer(authorizer: Authorizer): Unit = {
CoreUtils.swallow(authorizer.close(), this)
}
def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
new AuthorizableRequestContext {
override def clientId(): String = ""

9
core/src/main/scala/kafka/server/BrokerServer.scala

@ -25,6 +25,7 @@ import kafka.log.remote.RemoteLogManager @@ -25,6 +25,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.metadata.{BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.NetworkClient
@ -75,6 +76,8 @@ class BrokerServer( @@ -75,6 +76,8 @@ class BrokerServer(
// Get raftManager from SharedServer. It will be initialized during startup.
def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager
Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
override def brokerState: BrokerState = Option(lifecycleManager).
flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
@ -172,7 +175,6 @@ class BrokerServer( @@ -172,7 +175,6 @@ class BrokerServer(
sharedServer.startForBroker()
info("Starting broker")
config.dynamicConfig.initialize(zkClientOpt = None)
lifecycleManager = new BrokerLifecycleManager(config,
@ -368,8 +370,7 @@ class BrokerServer( @@ -368,8 +370,7 @@ class BrokerServer(
}
// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizer = AuthorizerUtils.configureAuthorizer(config)
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
@ -568,7 +569,7 @@ class BrokerServer( @@ -568,7 +569,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(AuthorizerUtils.closeAuthorizer)
/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other

15
core/src/main/scala/kafka/server/ControllerServer.scala

@ -22,6 +22,7 @@ import kafka.migration.MigrationPropagator @@ -22,6 +22,7 @@ import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.QuotaFactory.QuotaManagers
@ -145,15 +146,10 @@ class ControllerServer( @@ -145,15 +146,10 @@ class ControllerServer(
metricsGroup.newGauge("ClusterId", () => clusterId)
metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
if (linuxIoMetricsCollector.usable()) {
metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}
Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
val javaListeners = config.controllerListeners.map(_.toJava).asJava
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizer = AuthorizerUtils.configureAuthorizer(config)
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
@ -288,7 +284,8 @@ class ControllerServer( @@ -288,7 +284,8 @@ class ControllerServer(
sharedServer.metaProps,
controllerNodes.asScala.toSeq,
apiVersionManager)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
controllerApisHandlerPool = new KafkaRequestHandlerPool(
config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
@ -397,7 +394,7 @@ class ControllerServer( @@ -397,7 +394,7 @@ class ControllerServer(
controller.close()
if (quorumControllerMetrics != null)
CoreUtils.swallow(quorumControllerMetrics.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(AuthorizerUtils.closeAuthorizer(_))
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))

6
core/src/main/scala/kafka/server/KafkaServer.scala

@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter @@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter
import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
@ -488,8 +489,7 @@ class KafkaServer( @@ -488,8 +489,7 @@ class KafkaServer(
)
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizer = AuthorizerUtils.configureAuthorizer(config)
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
@ -912,7 +912,7 @@ class KafkaServer( @@ -912,7 +912,7 @@ class KafkaServer(
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
authorizer.foreach(AuthorizerUtils.closeAuthorizer)
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)

12
core/src/main/scala/kafka/server/Server.scala

@ -16,9 +16,12 @@ @@ -16,9 +16,12 @@
*/
package kafka.server
import kafka.metrics.LinuxIoMetricsCollector
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.slf4j.Logger
import java.util
import java.util.concurrent.TimeUnit
@ -45,6 +48,15 @@ object Server { @@ -45,6 +48,15 @@ object Server {
buildMetrics(config, time, metricsContext)
}
def maybeRegisterLinuxMetrics(
config: KafkaConfig,
time: Time,
logger: Logger
): Unit = {
val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger)
linuxIoMetricsCollector.maybeRegisterMetrics(KafkaYammerMetrics.defaultRegistry())
}
private def buildMetrics(
config: KafkaConfig,
time: Time,

Loading…
Cancel
Save