From 3ec2ca5e334ef42683263ed50c718c62917046af Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 14 Mar 2019 16:18:52 -0700 Subject: [PATCH] KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402) Adds a new listener config `max.connections` to limit the number of active connections on each listener. The config may be prefixed with listener prefix. This limit may be dynamically reconfigured without restarting the broker. This is one of the PRs for KIP-402 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors). Note that this is currently built on top of PR #6022 Author: Rajini Sivaram Reviewers: Gwen Shapira Closes #6034 from rajinisivaram/KAFKA-7730-max-connections --- .../apache/kafka/common/network/Selector.java | 22 ++ .../kafka/common/network/SelectorTest.java | 30 +++ .../scala/kafka/network/SocketServer.scala | 209 +++++++++++++++-- .../kafka/server/DynamicBrokerConfig.scala | 40 ++-- .../main/scala/kafka/server/KafkaConfig.scala | 19 +- .../network/DynamicConnectionQuotaTest.scala | 217 +++++++++++++----- .../server/DynamicBrokerConfigTest.scala | 6 + 7 files changed, 441 insertions(+), 102 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index e431e2788d8..b3497973afd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -919,6 +919,28 @@ public class Selector implements Selectable, AutoCloseable { return closingChannels.get(id); } + /** + * Returns the lowest priority channel chosen using the following sequence: + * 1) If one or more channels are in closing state, return any one of them + * 2) If idle expiry manager is enabled, return the least recently updated channel + * 3) Otherwise return any of the channels + * + * This method is used to close a channel to accommodate a new channel on the inter-broker listener + * when broker-wide `max.connections` limit is enabled. + */ + public KafkaChannel lowestPriorityChannel() { + KafkaChannel channel = null; + if (!closingChannels.isEmpty()) { + channel = closingChannels.values().iterator().next(); + } else if (idleExpiryManager != null && !idleExpiryManager.lruConnections.isEmpty()) { + String channelId = idleExpiryManager.lruConnections.keySet().iterator().next(); + channel = channel(channelId); + } else if (!channels.isEmpty()) { + channel = channels.values().iterator().next(); + } + return channel; + } + /** * Get the channel associated with selectionKey */ diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 7cb89c28de2..8cbada750c2 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -688,6 +688,36 @@ public class SelectorTest { assertEquals((double) conns, getMetric("connection-count").metricValue()); } + @SuppressWarnings("unchecked") + @Test + public void testLowestPriorityChannel() throws Exception { + int conns = 5; + InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + for (int i = 0; i < conns; i++) { + connect(String.valueOf(i), addr); + } + assertNotNull(selector.lowestPriorityChannel()); + for (int i = conns - 1; i >= 0; i--) { + if (i != 2) + assertEquals("", blockingRequest(String.valueOf(i), "")); + time.sleep(10); + } + assertEquals("2", selector.lowestPriorityChannel().id()); + + Field field = Selector.class.getDeclaredField("closingChannels"); + field.setAccessible(true); + Map closingChannels = (Map) field.get(selector); + closingChannels.put("3", selector.channel("3")); + assertEquals("3", selector.lowestPriorityChannel().id()); + closingChannels.remove("3"); + + for (int i = 0; i < conns; i++) { + selector.close(String.valueOf(i)); + } + assertNull(selector.lowestPriorityChannel()); + } + + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d8c4a74cdd2..e06cee75ffe 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net._ import java.nio.channels._ import java.nio.channels.{Selector => NSelector} +import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ import java.util.function.Supplier @@ -32,15 +33,16 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingRespo import kafka.network.Processor._ import kafka.network.SocketServer._ import kafka.security.CredentialProvider -import kafka.server.KafkaConfig +import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ +import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.{KafkaException, Reconfigurable} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Meter import org.apache.kafka.common.metrics.stats.Total import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent -import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.SecurityProtocol @@ -70,7 +72,11 @@ import scala.util.control.ControlThrowable * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. * 1 Handler thread that handles requests and produce responses back to the processor thread for writing. */ -class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { +class SocketServer(val config: KafkaConfig, + val metrics: Metrics, + val time: Time, + val credentialProvider: CredentialProvider) + extends Logging with KafkaMetricsGroup with BrokerReconfigurable { private val maxQueuedRequests = config.queuedMaxRequests @@ -109,7 +115,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time */ def startup(startupProcessors: Boolean = true) { this.synchronized { - connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) + connectionQuotas = new ConnectionQuotas(config, time) createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) if (startupProcessors) { @@ -212,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = synchronized { endpoints.foreach { endpoint => + connectionQuotas.addListener(config, endpoint.listenerName) val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix) addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() @@ -223,6 +230,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized { endpointOpt.foreach { endpoint => + connectionQuotas.addListener(config, endpoint.listenerName) val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix) val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool) controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) @@ -324,18 +332,33 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { info(s"Removing data-plane listeners for endpoints $listenersRemoved") listenersRemoved.foreach { endpoint => + connectionQuotas.removeListener(config, endpoint.listenerName) dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown()) } } - def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { - info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp") - connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp) + override def reconfigurableConfigs: Set[String] = SocketServer.ReconfigurableConfigs + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + } - def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = { - info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}") - connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + val maxConnectionsPerIp = newConfig.maxConnectionsPerIp + if (maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) { + info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp") + connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp) + } + val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides + if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides) { + info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}") + connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) + } + val maxConnections = newConfig.maxConnections + if (maxConnections != oldConfig.maxConnections) { + info(s"Updating broker-wide maxConnections: $maxConnections") + connectionQuotas.updateBrokerMaxConnections(maxConnections) + } } // `protected` for test usage @@ -373,6 +396,13 @@ object SocketServer { val ControlPlaneThreadPrefix = "control-plane" val DataPlaneMetricPrefix = "" val ControlPlaneMetricPrefix = "ControlPlane" + + val ReconfigurableConfigs = Set( + KafkaConfig.MaxConnectionsPerIpProp, + KafkaConfig.MaxConnectionsPerIpOverridesProp, + KafkaConfig.MaxConnectionsProp) + + val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp) } /** @@ -427,10 +457,10 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ /** * Close `channel` and decrement the connection count. */ - def close(channel: SocketChannel): Unit = { + def close(listenerName: ListenerName, channel: SocketChannel): Unit = { if (channel != null) { debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}") - connectionQuotas.dec(channel.socket.getInetAddress) + connectionQuotas.dec(listenerName, channel.socket.getInetAddress) CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) CoreUtils.swallow(channel.close(), this, Level.ERROR) } @@ -500,6 +530,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, var currentProcessorIndex = 0 while (isRunning) { try { + val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() @@ -508,6 +539,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, try { val key = iter.next iter.remove() + if (key.isAcceptable) { accept(key).foreach { socketChannel => @@ -582,7 +614,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { - connectionQuotas.inc(socketChannel.socket().getInetAddress) + connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) @@ -592,7 +624,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } catch { case e: TooManyConnectionsException => info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") - close(socketChannel) + close(endPoint.listenerName, socketChannel) None } } @@ -731,6 +763,7 @@ private[kafka] class Processor(val id: Int, processCompletedReceives() processCompletedSends() processDisconnected() + closeExcessConnections() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be @@ -911,13 +944,21 @@ private[kafka] class Processor(val id: Int, }.remoteHost inflightResponses.remove(connectionId).foreach(updateRequestMetrics) // the channel has been closed by the selector but the quotas still need to be updated - connectionQuotas.dec(InetAddress.getByName(remoteHost)) + connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost)) } catch { case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e) } } } + private def closeExcessConnections(): Unit = { + if (connectionQuotas.maxConnectionsExceeded(listenerName)) { + val channel = selector.lowestPriorityChannel() + if (channel != null) + close(channel.id) + } + } + /** * Close the connection identified by `connectionId` and decrement the connection count. * The channel will be immediately removed from the selector's `channels` or `closingChannels` @@ -930,7 +971,7 @@ private[kafka] class Processor(val id: Int, debug(s"Closing selector connection $connectionId") val address = channel.socketAddress if (address != null) - connectionQuotas.dec(address) + connectionQuotas.dec(listenerName, address) selector.close(connectionId) inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response)) @@ -977,7 +1018,7 @@ private[kafka] class Processor(val id: Int, case e: Throwable => val remoteAddress = channel.socket.getRemoteSocketAddress // need to close the channel here to avoid a socket leak. - close(channel) + close(listenerName, channel) processException(s"Processor $id closed connection from $remoteAddress", e) } } @@ -1058,31 +1099,72 @@ private[kafka] class Processor(val id: Int, } -class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { +class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { - @volatile private var defaultMaxConnectionsPerIp = defaultMax - @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } + @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp + @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } + @volatile private var brokerMaxConnections = config.maxConnections private val counts = mutable.Map[InetAddress, Int]() - def inc(address: InetAddress) { + // Listener counts and configs are synchronized on `counts` + private val listenerCounts = mutable.Map[ListenerName, Int]() + private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() + @volatile private var totalCount = 0 + + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter) { counts.synchronized { + waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val count = counts.getOrElseUpdate(address, 0) counts.put(address, count + 1) + totalCount += 1 + if (listenerCounts.contains(listenerName)) { + listenerCounts.put(listenerName, listenerCounts(listenerName) + 1) + } val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp) if (count >= max) throw new TooManyConnectionsException(address, max) } } - def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { + private[network] def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { defaultMaxConnectionsPerIp = maxConnectionsPerIp } - def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { + private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } } - def dec(address: InetAddress) { + private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = { + counts.synchronized { + brokerMaxConnections = maxConnections + counts.notifyAll() + } + } + + private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = { + counts.synchronized { + if (!maxConnectionsPerListener.contains(listenerName)) { + val newListenerQuota = new ListenerConnectionQuota(counts, listenerName) + maxConnectionsPerListener.put(listenerName, newListenerQuota) + listenerCounts.put(listenerName, 0) + config.addReconfigurable(newListenerQuota) + } + counts.notifyAll() + } + } + + private[network] def removeListener(config: KafkaConfig, listenerName: ListenerName): Unit = { + counts.synchronized { + maxConnectionsPerListener.remove(listenerName).foreach { listenerQuota => + listenerCounts.remove(listenerName) + counts.notifyAll() // wake up any waiting acceptors to close cleanly + config.removeReconfigurable(listenerQuota) + } + } + } + + def dec(listenerName: ListenerName, address: InetAddress) { counts.synchronized { val count = counts.getOrElse(address, throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address")) @@ -1090,6 +1172,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { counts.remove(address) else counts.put(address, count - 1) + + if (totalCount <= 0) + error(s"Attempted to decrease total connection count for broker with no connections") + totalCount -= 1 + + if (maxConnectionsPerListener.contains(listenerName)) { + val listenerCount = listenerCounts(listenerName) + if (listenerCount == 0) + error(s"Attempted to decrease connection count for listener $listenerName with no connections") + else + listenerCounts.put(listenerName, listenerCount - 1) + } + counts.notifyAll() // wake up any acceptors waiting to process a new connection since listener connection limit was reached } } @@ -1097,6 +1192,72 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { counts.getOrElse(address, 0) } + private def waitForConnectionSlot(listenerName: ListenerName, + acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { + counts.synchronized { + if (!connectionSlotAvailable(listenerName)) { + val startNs = time.nanoseconds + do { + counts.wait() + } while (!connectionSlotAvailable(listenerName)) + acceptorBlockedPercentMeter.mark(time.nanoseconds - startNs) + } + } + } + + // This is invoked in every poll iteration and we close one LRU connection in an iteration + // if necessary + def maxConnectionsExceeded(listenerName: ListenerName): Boolean = { + totalCount > brokerMaxConnections && !protectedListener(listenerName) + } + + private def connectionSlotAvailable(listenerName: ListenerName): Boolean = { + if (listenerCounts(listenerName) >= maxListenerConnections(listenerName)) + false + else if (protectedListener(listenerName)) + true + else + totalCount < brokerMaxConnections + } + + private def protectedListener(listenerName: ListenerName): Boolean = + config.interBrokerListenerName == listenerName && config.listeners.size > 1 + + private def maxListenerConnections(listenerName: ListenerName): Int = + maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + + class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable { + @volatile private var _maxConnections = Int.MaxValue + private val listenerPropName = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}" + def maxConnections: Int = _maxConnections + + override def listenerName(): ListenerName = listener + + override def configure(configs: util.Map[String, _]): Unit = { + _maxConnections = maxConnections(configs) + } + + override def reconfigurableConfigs(): util.Set[String] = { + SocketServer.ListenerReconfigurableConfigs.asJava + } + + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { + val value = maxConnections(configs) + if (value <= 0) + throw new ConfigException("Invalid max.connections $listenerMax") + } + + override def reconfigure(configs: util.Map[String, _]): Unit = { + lock.synchronized { + _maxConnections = maxConnections(configs) + lock.notifyAll() + } + } + + private def maxConnections(configs: util.Map[String, _]): Int = { + Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue) + } + } } class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)") diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index b02b842b954..4f27226265e 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.cluster.EndPoint import kafka.log.{LogCleaner, LogConfig, LogManager} +import kafka.network.SocketServer import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{AdminZkClient, KafkaZkClient} @@ -81,10 +82,11 @@ object DynamicBrokerConfig { DynamicThreadPool.ReconfigurableConfigs ++ Set(KafkaConfig.MetricReporterClassesProp) ++ DynamicListenerConfig.ReconfigurableConfigs ++ - DynamicConnectionQuota.ReconfigurableConfigs + SocketServer.ReconfigurableConfigs + private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp) private val PerBrokerConfigs = DynamicSecurityConfigs ++ - DynamicListenerConfig.ReconfigurableConfigs + DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp) private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) @@ -135,7 +137,13 @@ object DynamicBrokerConfig { private def perBrokerConfigs(props: Properties): Set[String] = { val configNames = props.asScala.keySet - configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty) + def perBrokerListenerConfig(name: String): Boolean = { + name match { + case ListenerConfigRegex(baseName) => !ClusterLevelListenerConfigs.contains(baseName) + case _ => false + } + } + configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig) } private def nonDynamicConfigs(props: Properties): Set[String] = { @@ -224,7 +232,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(kafkaServer.logManager.cleaner) addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) - addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer)) + addBrokerReconfigurable(kafkaServer.socketServer) } def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { @@ -789,7 +797,10 @@ object DynamicListenerConfig { KafkaConfig.SaslLoginRefreshWindowFactorProp, KafkaConfig.SaslLoginRefreshWindowJitterProp, KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp, - KafkaConfig.SaslLoginRefreshBufferSecondsProp + KafkaConfig.SaslLoginRefreshBufferSecondsProp, + + // Connection limit configs + KafkaConfig.MaxConnectionsProp ) } @@ -884,24 +895,5 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi } -object DynamicConnectionQuota { - val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp) -} - -class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable { - - override def reconfigurableConfigs: Set[String] = { - DynamicConnectionQuota.ReconfigurableConfigs - } - - override def validateReconfiguration(newConfig: KafkaConfig): Unit = { - } - - override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides) - if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) - server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp) - } -} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 02e171527e5..8ab7b437550 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -75,6 +75,7 @@ object Defaults { val SocketRequestMaxBytes: Int = 100 * 1024 * 1024 val MaxConnectionsPerIp: Int = Int.MaxValue val MaxConnectionsPerIpOverrides: String = "" + val MaxConnections: Int = Int.MaxValue val ConnectionsMaxIdleMs = 10 * 60 * 1000L val RequestTimeoutMs = 30000 val FailedAuthenticationDelayMs = 100 @@ -293,6 +294,7 @@ object KafkaConfig { val SocketRequestMaxBytesProp = "socket.request.max.bytes" val MaxConnectionsPerIpProp = "max.connections.per.ip" val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" + val MaxConnectionsProp = "max.connections" val ConnectionsMaxIdleMsProp = "connections.max.idle.ms" val FailedAuthenticationDelayMsProp = "connection.failed.authentication.delay.ms" /***************** rack configuration *************/ @@ -570,8 +572,15 @@ object KafkaConfig { val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used." val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " + - "configured using " + MaxConnectionsPerIpOverridesProp + " property" - val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is \"hostName:100,127.0.0.1:200\"" + s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached." + val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " + + "An example value is \"hostName:100,127.0.0.1:200\"" + val MaxConnectionsDoc = "The maximum number of connections we allow in the broker at any time. This limit is applied in addition " + + s"to any per-ip limits configured using $MaxConnectionsPerIpProp. Listener-level limits may also be configured by prefixing the " + + s"config name with the listener prefix, for example, listener.name.internal.$MaxConnectionsProp. Broker-wide limit " + + "should be configured based on broker capacity while listener limits should be configured based on application requirements. " + + "New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " + + "permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case." val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this" val FailedAuthenticationDelayMsDoc = "Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. " + s"This must be configured to be less than $ConnectionsMaxIdleMsProp to prevent connection timeout." @@ -872,6 +881,7 @@ object KafkaConfig { .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc) .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) + .define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc) .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc) .define(FailedAuthenticationDelayMsProp, INT, Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW, FailedAuthenticationDelayMsDoc) @@ -1163,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp) val maxConnectionsPerIpOverrides: Map[String, Int] = getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} + def maxConnections = getInt(KafkaConfig.MaxConnectionsProp) val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) val failedAuthenticationDelayMs = getInt(KafkaConfig.FailedAuthenticationDelayMsProp) @@ -1331,6 +1342,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO dynamicConfig.addReconfigurable(reconfigurable) } + def removeReconfigurable(reconfigurable: Reconfigurable): Unit = { + dynamicConfig.removeReconfigurable(reconfigurable) + } + def logRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 78d9af7bdfc..514d879a1d9 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -21,18 +21,19 @@ package kafka.network import java.io.IOException import java.net.{InetAddress, Socket} import java.util.Properties +import java.util.concurrent._ import kafka.server.{BaseRequestTest, KafkaConfig} -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.Assert.assertEquals -import org.junit.{Before, Test} +import org.junit.Assert._ +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -41,6 +42,9 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { override def numBrokers = 1 val topic = "test" + val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val localAddress = InetAddress.getByName("127.0.0.1") + var executor: ExecutorService = _ @Before override def setUp(): Unit = { @@ -48,76 +52,138 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers) } - @Test - def testDynamicConnectionQuota(): Unit = { - def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = { - new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0) + @After + override def tearDown(): Unit = { + try { + if (executor != null) { + executor.shutdownNow() + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)) + } + } finally { + super.tearDown() } + } - val socketServer = servers.head.socketServer - val localAddress = InetAddress.getByName("127.0.0.1") - def connectionCount = socketServer.connectionCount(localAddress) + override protected def propertyOverrides(properties: Properties): Unit = { + super.propertyOverrides(properties) + } + + @Test + def testDynamicConnectionQuota(): Unit = { val initialConnectionCount = connectionCount val maxConnectionsPerIP = 5 + def connectAndVerify: Unit = { + val socket = connect() + try { + sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) + } finally { + socket.close() + } + } + val props = new Properties props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString) reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)) - //wait for adminClient connections to close - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") - - //create connections up to maxConnectionsPerIP - 1, leave space for one connection - var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer)) - - // produce should succeed - var produceResponse = sendProduceRequest() - assertEquals(1, produceResponse.responses.size) - val (tp, partitionResponse) = produceResponse.responses.asScala.head - assertEquals(Errors.NONE, partitionResponse.error) - - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed") - conns = conns :+ connect(socketServer) - // now try one more (should fail) - intercept[IOException](sendProduceRequest()) - - conns.foreach(conn => conn.close()) - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + verifyMaxConnections(maxConnectionsPerIP, () => connectAndVerify) // Increase MaxConnectionsPerIpOverrides for localhost to 7 val maxConnectionsPerIPOverride = 7 props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")) - //wait for adminClient connections to close - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + verifyMaxConnections(maxConnectionsPerIPOverride, () => connectAndVerify) + } - //create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection - conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer)) + @Test + def testDynamicListenerConnectionQuota(): Unit = { + val socketServer = servers.head.socketServer + val initialConnectionCount = connectionCount - // send should succeed - produceResponse = sendProduceRequest() - assertEquals(1, produceResponse.responses.size) - val (tp1, partitionResponse1) = produceResponse.responses.asScala.head - assertEquals(Errors.NONE, partitionResponse1.error) + def connectAndVerify(): Unit = { + val socket = connect("PLAINTEXT") + socket.setSoTimeout(1000) + try { + val response = sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) + assertEquals(0, response.remaining) + } finally { + socket.close() + } + } - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed") - conns = conns :+ connect(socketServer) - // now try one more (should fail) - intercept[IOException](sendProduceRequest()) + // Reduce total broker MaxConnections to 5 at the cluster level + val props = new Properties + props.put(KafkaConfig.MaxConnectionsProp, "5") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsProp, "5")) + verifyMaxConnections(5, () => connectAndVerify) - //close one connection - conns.head.close() - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed") - // send should succeed - sendProduceRequest() + // Create another listener and verify listener connection limit of 5 for each listener + val newListeners = "PLAINTEXT://localhost:0,INTERNAL://localhost:0" + props.put(KafkaConfig.ListenersProp, newListeners) + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT") + props.put(KafkaConfig.MaxConnectionsProp, "10") + props.put("listener.name.internal.max.connections", "5") + props.put("listener.name.plaintext.max.connections", "5") + reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners)) + waitForListener("INTERNAL") + + var conns = (connectionCount until 5).map(_ => connect("PLAINTEXT")) + conns ++= (5 until 10).map(_ => connect("INTERNAL")) + conns.foreach(verifyConnection) + conns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") + + // Increase MaxConnections for PLAINTEXT listener to 7 at the broker level + val maxConnectionsPlaintext = 7 + val listenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}" + props.put(listenerProp, maxConnectionsPlaintext.toString) + reconfigureServers(props, perBrokerConfig = true, (listenerProp, maxConnectionsPlaintext.toString)) + verifyMaxConnections(maxConnectionsPlaintext, () => connectAndVerify) + + // Verify that connection blocked on the limit connects successfully when an existing connection is closed + val plaintextConnections = (connectionCount until maxConnectionsPlaintext).map(_ => connect("PLAINTEXT")) + executor = Executors.newSingleThreadExecutor + val future = executor.submit(CoreUtils.runnable { createAndVerifyConnection() }) + Thread.sleep(100) + assertFalse(future.isDone) + plaintextConnections.head.close() + future.get(30, TimeUnit.SECONDS) + plaintextConnections.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") + + // Verify that connections on inter-broker listener succeed even if broker max connections has been + // reached by closing connections on another listener + var plaintextConns = (connectionCount until 5).map(_ => connect("PLAINTEXT")) + val internalConns = (5 until 10).map(_ => connect("INTERNAL")) + plaintextConns.foreach(verifyConnection) + internalConns.foreach(verifyConnection) + plaintextConns ++= (0 until 2).map(_ => connect("PLAINTEXT")) + TestUtils.waitUntilTrue(() => connectionCount <= 10, "Internal connections not closed") + plaintextConns.foreach(verifyConnection) + intercept[IOException](internalConns.foreach { socket => sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) }) + plaintextConns.foreach(_.close()) + internalConns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") } private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = { + val initialConnectionCount = connectionCount val adminClient = createAdminClient() TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get() waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2) adminClient.close() + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Admin client connection not closed") + } + + private def waitForListener(listenerName: String): Unit = { + TestUtils.retry(maxWaitMs = 10000) { + try { + assertTrue(servers.head.socketServer.boundPort(ListenerName.normalised(listenerName)) > 0) + } catch { + case e: KafkaException => throw new AssertionError(e) + } + } } private def createAdminClient(): AdminClient = { @@ -135,12 +201,59 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } - private def sendProduceRequest(): ProduceResponse = { + private def produceRequest: ProduceRequest = { val topicPartition = new TopicPartition(topic, 0) val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) val partitionRecords = Map(topicPartition -> memoryRecords) - val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() - val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer) - ProduceResponse.parse(response, request.version) + ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() + } + + def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress) + + def connect(listener: String): Socket = { + val listenerName = ListenerName.normalised(listener) + new Socket("localhost", servers.head.socketServer.boundPort(listenerName)) + } + + private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = { + val socket = connect(listener) + try { + verifyConnection(socket) + } finally { + socket.close() + } + } + + private def verifyConnection(socket: Socket): Unit = { + val request = produceRequest + val response = sendAndReceive(request, ApiKeys.PRODUCE, socket) + val produceResponse = ProduceResponse.parse(response, request.version) + assertEquals(1, produceResponse.responses.size) + val (_, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(Errors.NONE, partitionResponse.error) + } + + private def verifyMaxConnections(maxConnections: Int, connectWithFailure: () => Unit): Unit = { + val initialConnectionCount = connectionCount + + //create connections up to maxConnectionsPerIP - 1, leave space for one connection + var conns = (connectionCount until (maxConnections - 1)).map(_ => connect("PLAINTEXT")) + + // produce should succeed on a new connection + createAndVerifyConnection() + + TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "produce request connection is not closed") + conns = conns :+ connect("PLAINTEXT") + + // now try one more (should fail) + intercept[IOException](connectWithFailure.apply()) + + //close one connection + conns.head.close() + TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "connection is not closed") + createAndVerifyConnection() + + conns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 45ef18f5187..5d20da64cb6 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -186,6 +186,12 @@ class DynamicBrokerConfigTest extends JUnitSuite { //test invalid address verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true, expectFailure = true) + + verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = true, expectFailure = false) + verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = false, expectFailure = false) + val listenerMaxConnectionsProp = s"listener.name.external.${KafkaConfig.MaxConnectionsProp}" + verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = true, expectFailure = false) + verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = false, expectFailure = false) } private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {