Browse Source

MINOR: Update authorizer start-up check to handle end point with ephemeral port

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #7350 from omkreddy/checkstartup
pull/7368/merge
Manikumar Reddy 5 years ago committed by Manikumar Reddy
parent
commit
fe2797a9fa
  1. 15
      core/src/main/scala/kafka/network/SocketServer.scala
  2. 6
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala

15
core/src/main/scala/kafka/network/SocketServer.scala

@ -210,9 +210,7 @@ class SocketServer(val config: KafkaConfig, @@ -210,9 +210,7 @@ class SocketServer(val config: KafkaConfig,
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
debug(s"Wait for authorizer to complete start up on listener ${endpoint.listener}")
authorizerFutures.get(endpoint).foreach { future =>
future.join()
}
waitForAuthorizerFuture(acceptor, authorizerFutures)
debug(s"Start processors on listener ${endpoint.listener}")
acceptor.startProcessors(DataPlaneThreadPrefix)
}
@ -226,7 +224,7 @@ class SocketServer(val config: KafkaConfig, @@ -226,7 +224,7 @@ class SocketServer(val config: KafkaConfig,
*/
def startControlPlaneProcessor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized {
controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
authorizerFutures.get(controlPlaneAcceptor.endPoint).foreach(_.get)
waitForAuthorizerFuture(controlPlaneAcceptor, authorizerFutures)
controlPlaneAcceptor.startProcessors(ControlPlaneThreadPrefix)
info(s"Started control-plane processor for the control-plane acceptor")
}
@ -380,6 +378,15 @@ class SocketServer(val config: KafkaConfig, @@ -380,6 +378,15 @@ class SocketServer(val config: KafkaConfig,
}
}
private def waitForAuthorizerFuture(acceptor: Acceptor,
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
//we can't rely on authorizerFutures.get() due to ephemeral ports. Get the future using listener name
authorizerFutures.foreach { case (endpoint, future) =>
if (endpoint.listener() == acceptor.endPoint.listener())
future.join()
}
}
// `protected` for test usage
protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {

6
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -202,6 +202,10 @@ class SocketServerTest { @@ -202,6 +202,10 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)
testableServer.startup(startupProcessors = false)
val updatedEndPoints = config.advertisedListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.asInstanceOf[Endpoint])
val externalReadyFuture = new CompletableFuture[Void]()
val executor = Executors.newSingleThreadExecutor()
@ -221,7 +225,7 @@ class SocketServerTest { @@ -221,7 +225,7 @@ class SocketServerTest {
sendAndReceiveControllerRequest(socket1, testableServer)
val externalListener = new ListenerName("EXTERNAL")
val externalEndpoint = new Endpoint(externalListener.value, SecurityProtocol.PLAINTEXT, "localhost", 0)
val externalEndpoint = updatedEndPoints.find(e => e.listener() == externalListener.value).get
val futures = Map(externalEndpoint -> externalReadyFuture)
val startFuture = executor.submit(CoreUtils.runnable(testableServer.startDataPlaneProcessors(futures)))
TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started")

Loading…
Cancel
Save