|
|
|
@ -589,28 +589,35 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
@@ -589,28 +589,35 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
|
|
|
|
|
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { |
|
|
|
|
CoreUtils.swallow(controlledShutdown()) |
|
|
|
|
brokerState.newState(BrokerShuttingDown) |
|
|
|
|
|
|
|
|
|
if (socketServer != null) |
|
|
|
|
CoreUtils.swallow(socketServer.shutdown()) |
|
|
|
|
if (requestHandlerPool != null) |
|
|
|
|
CoreUtils.swallow(requestHandlerPool.shutdown()) |
|
|
|
|
|
|
|
|
|
CoreUtils.swallow(kafkaScheduler.shutdown()) |
|
|
|
|
|
|
|
|
|
if (apis != null) |
|
|
|
|
CoreUtils.swallow(apis.close()) |
|
|
|
|
CoreUtils.swallow(authorizer.foreach(_.close())) |
|
|
|
|
if (replicaManager != null) |
|
|
|
|
CoreUtils.swallow(replicaManager.shutdown()) |
|
|
|
|
if (adminManager != null) |
|
|
|
|
CoreUtils.swallow(adminManager.shutdown()) |
|
|
|
|
|
|
|
|
|
if (transactionCoordinator != null) |
|
|
|
|
CoreUtils.swallow(transactionCoordinator.shutdown()) |
|
|
|
|
if (groupCoordinator != null) |
|
|
|
|
CoreUtils.swallow(groupCoordinator.shutdown()) |
|
|
|
|
|
|
|
|
|
if (replicaManager != null) |
|
|
|
|
CoreUtils.swallow(replicaManager.shutdown()) |
|
|
|
|
if (logManager != null) |
|
|
|
|
CoreUtils.swallow(logManager.shutdown()) |
|
|
|
|
|
|
|
|
|
if (kafkaController != null) |
|
|
|
|
CoreUtils.swallow(kafkaController.shutdown()) |
|
|
|
|
if (zkUtils != null) |
|
|
|
|
CoreUtils.swallow(zkUtils.close()) |
|
|
|
|
|
|
|
|
|
if (metrics != null) |
|
|
|
|
CoreUtils.swallow(metrics.close()) |
|
|
|
|
|
|
|
|
|