diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index adaaabf946f..e705d41b16d 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -210,9 +210,7 @@ class SocketServer(val config: KafkaConfig, orderedAcceptors.foreach { acceptor => val endpoint = acceptor.endPoint debug(s"Wait for authorizer to complete start up on listener ${endpoint.listener}") - authorizerFutures.get(endpoint).foreach { future => - future.join() - } + waitForAuthorizerFuture(acceptor, authorizerFutures) debug(s"Start processors on listener ${endpoint.listener}") acceptor.startProcessors(DataPlaneThreadPrefix) } @@ -226,7 +224,7 @@ class SocketServer(val config: KafkaConfig, */ def startControlPlaneProcessor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized { controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor => - authorizerFutures.get(controlPlaneAcceptor.endPoint).foreach(_.get) + waitForAuthorizerFuture(controlPlaneAcceptor, authorizerFutures) controlPlaneAcceptor.startProcessors(ControlPlaneThreadPrefix) info(s"Started control-plane processor for the control-plane acceptor") } @@ -380,6 +378,15 @@ class SocketServer(val config: KafkaConfig, } } + private def waitForAuthorizerFuture(acceptor: Acceptor, + authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = { + //we can't rely on authorizerFutures.get() due to ephemeral ports. Get the future using listener name + authorizerFutures.foreach { case (endpoint, future) => + if (endpoint.listener() == acceptor.endPoint.listener()) + future.join() + } + } + // `protected` for test usage protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7072c94f765..507e117f5d2 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -202,6 +202,10 @@ class SocketServerTest { val config = KafkaConfig.fromProps(testProps) val testableServer = new TestableSocketServer(config) testableServer.startup(startupProcessors = false) + val updatedEndPoints = config.advertisedListeners.map { endpoint => + endpoint.copy(port = testableServer.boundPort(endpoint.listenerName)) + }.map(_.asInstanceOf[Endpoint]) + val externalReadyFuture = new CompletableFuture[Void]() val executor = Executors.newSingleThreadExecutor() @@ -221,7 +225,7 @@ class SocketServerTest { sendAndReceiveControllerRequest(socket1, testableServer) val externalListener = new ListenerName("EXTERNAL") - val externalEndpoint = new Endpoint(externalListener.value, SecurityProtocol.PLAINTEXT, "localhost", 0) + val externalEndpoint = updatedEndPoints.find(e => e.listener() == externalListener.value).get val futures = Map(externalEndpoint -> externalReadyFuture) val startFuture = executor.submit(CoreUtils.runnable(testableServer.startDataPlaneProcessors(futures))) TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started")