diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index e431e2788d8..b3497973afd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -919,6 +919,28 @@ public class Selector implements Selectable, AutoCloseable { return closingChannels.get(id); } + /** + * Returns the lowest priority channel chosen using the following sequence: + * 1) If one or more channels are in closing state, return any one of them + * 2) If idle expiry manager is enabled, return the least recently updated channel + * 3) Otherwise return any of the channels + * + * This method is used to close a channel to accommodate a new channel on the inter-broker listener + * when broker-wide `max.connections` limit is enabled. + */ + public KafkaChannel lowestPriorityChannel() { + KafkaChannel channel = null; + if (!closingChannels.isEmpty()) { + channel = closingChannels.values().iterator().next(); + } else if (idleExpiryManager != null && !idleExpiryManager.lruConnections.isEmpty()) { + String channelId = idleExpiryManager.lruConnections.keySet().iterator().next(); + channel = channel(channelId); + } else if (!channels.isEmpty()) { + channel = channels.values().iterator().next(); + } + return channel; + } + /** * Get the channel associated with selectionKey */ diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 7cb89c28de2..8cbada750c2 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -688,6 +688,36 @@ public class SelectorTest { assertEquals((double) conns, getMetric("connection-count").metricValue()); } + @SuppressWarnings("unchecked") + @Test + public void testLowestPriorityChannel() throws Exception { + int conns = 5; + InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + for (int i = 0; i < conns; i++) { + connect(String.valueOf(i), addr); + } + assertNotNull(selector.lowestPriorityChannel()); + for (int i = conns - 1; i >= 0; i--) { + if (i != 2) + assertEquals("", blockingRequest(String.valueOf(i), "")); + time.sleep(10); + } + assertEquals("2", selector.lowestPriorityChannel().id()); + + Field field = Selector.class.getDeclaredField("closingChannels"); + field.setAccessible(true); + Map closingChannels = (Map) field.get(selector); + closingChannels.put("3", selector.channel("3")); + assertEquals("3", selector.lowestPriorityChannel().id()); + closingChannels.remove("3"); + + for (int i = 0; i < conns; i++) { + selector.close(String.valueOf(i)); + } + assertNull(selector.lowestPriorityChannel()); + } + + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d8c4a74cdd2..e06cee75ffe 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net._ import java.nio.channels._ import java.nio.channels.{Selector => NSelector} +import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ import java.util.function.Supplier @@ -32,15 +33,16 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingRespo import kafka.network.Processor._ import kafka.network.SocketServer._ import kafka.security.CredentialProvider -import kafka.server.KafkaConfig +import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ +import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.{KafkaException, Reconfigurable} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Meter import org.apache.kafka.common.metrics.stats.Total import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent -import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.SecurityProtocol @@ -70,7 +72,11 @@ import scala.util.control.ControlThrowable * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. * 1 Handler thread that handles requests and produce responses back to the processor thread for writing. */ -class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { +class SocketServer(val config: KafkaConfig, + val metrics: Metrics, + val time: Time, + val credentialProvider: CredentialProvider) + extends Logging with KafkaMetricsGroup with BrokerReconfigurable { private val maxQueuedRequests = config.queuedMaxRequests @@ -109,7 +115,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time */ def startup(startupProcessors: Boolean = true) { this.synchronized { - connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) + connectionQuotas = new ConnectionQuotas(config, time) createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) if (startupProcessors) { @@ -212,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = synchronized { endpoints.foreach { endpoint => + connectionQuotas.addListener(config, endpoint.listenerName) val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix) addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() @@ -223,6 +230,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized { endpointOpt.foreach { endpoint => + connectionQuotas.addListener(config, endpoint.listenerName) val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix) val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool) controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) @@ -324,18 +332,33 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { info(s"Removing data-plane listeners for endpoints $listenersRemoved") listenersRemoved.foreach { endpoint => + connectionQuotas.removeListener(config, endpoint.listenerName) dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown()) } } - def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { - info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp") - connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp) + override def reconfigurableConfigs: Set[String] = SocketServer.ReconfigurableConfigs + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + } - def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = { - info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}") - connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + val maxConnectionsPerIp = newConfig.maxConnectionsPerIp + if (maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) { + info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp") + connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp) + } + val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides + if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides) { + info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}") + connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) + } + val maxConnections = newConfig.maxConnections + if (maxConnections != oldConfig.maxConnections) { + info(s"Updating broker-wide maxConnections: $maxConnections") + connectionQuotas.updateBrokerMaxConnections(maxConnections) + } } // `protected` for test usage @@ -373,6 +396,13 @@ object SocketServer { val ControlPlaneThreadPrefix = "control-plane" val DataPlaneMetricPrefix = "" val ControlPlaneMetricPrefix = "ControlPlane" + + val ReconfigurableConfigs = Set( + KafkaConfig.MaxConnectionsPerIpProp, + KafkaConfig.MaxConnectionsPerIpOverridesProp, + KafkaConfig.MaxConnectionsProp) + + val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp) } /** @@ -427,10 +457,10 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ /** * Close `channel` and decrement the connection count. */ - def close(channel: SocketChannel): Unit = { + def close(listenerName: ListenerName, channel: SocketChannel): Unit = { if (channel != null) { debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}") - connectionQuotas.dec(channel.socket.getInetAddress) + connectionQuotas.dec(listenerName, channel.socket.getInetAddress) CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) CoreUtils.swallow(channel.close(), this, Level.ERROR) } @@ -500,6 +530,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, var currentProcessorIndex = 0 while (isRunning) { try { + val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() @@ -508,6 +539,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, try { val key = iter.next iter.remove() + if (key.isAcceptable) { accept(key).foreach { socketChannel => @@ -582,7 +614,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { - connectionQuotas.inc(socketChannel.socket().getInetAddress) + connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) @@ -592,7 +624,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } catch { case e: TooManyConnectionsException => info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") - close(socketChannel) + close(endPoint.listenerName, socketChannel) None } } @@ -731,6 +763,7 @@ private[kafka] class Processor(val id: Int, processCompletedReceives() processCompletedSends() processDisconnected() + closeExcessConnections() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be @@ -911,13 +944,21 @@ private[kafka] class Processor(val id: Int, }.remoteHost inflightResponses.remove(connectionId).foreach(updateRequestMetrics) // the channel has been closed by the selector but the quotas still need to be updated - connectionQuotas.dec(InetAddress.getByName(remoteHost)) + connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost)) } catch { case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e) } } } + private def closeExcessConnections(): Unit = { + if (connectionQuotas.maxConnectionsExceeded(listenerName)) { + val channel = selector.lowestPriorityChannel() + if (channel != null) + close(channel.id) + } + } + /** * Close the connection identified by `connectionId` and decrement the connection count. * The channel will be immediately removed from the selector's `channels` or `closingChannels` @@ -930,7 +971,7 @@ private[kafka] class Processor(val id: Int, debug(s"Closing selector connection $connectionId") val address = channel.socketAddress if (address != null) - connectionQuotas.dec(address) + connectionQuotas.dec(listenerName, address) selector.close(connectionId) inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response)) @@ -977,7 +1018,7 @@ private[kafka] class Processor(val id: Int, case e: Throwable => val remoteAddress = channel.socket.getRemoteSocketAddress // need to close the channel here to avoid a socket leak. - close(channel) + close(listenerName, channel) processException(s"Processor $id closed connection from $remoteAddress", e) } } @@ -1058,31 +1099,72 @@ private[kafka] class Processor(val id: Int, } -class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { +class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { - @volatile private var defaultMaxConnectionsPerIp = defaultMax - @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } + @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp + @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } + @volatile private var brokerMaxConnections = config.maxConnections private val counts = mutable.Map[InetAddress, Int]() - def inc(address: InetAddress) { + // Listener counts and configs are synchronized on `counts` + private val listenerCounts = mutable.Map[ListenerName, Int]() + private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() + @volatile private var totalCount = 0 + + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter) { counts.synchronized { + waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val count = counts.getOrElseUpdate(address, 0) counts.put(address, count + 1) + totalCount += 1 + if (listenerCounts.contains(listenerName)) { + listenerCounts.put(listenerName, listenerCounts(listenerName) + 1) + } val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp) if (count >= max) throw new TooManyConnectionsException(address, max) } } - def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { + private[network] def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { defaultMaxConnectionsPerIp = maxConnectionsPerIp } - def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { + private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } } - def dec(address: InetAddress) { + private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = { + counts.synchronized { + brokerMaxConnections = maxConnections + counts.notifyAll() + } + } + + private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = { + counts.synchronized { + if (!maxConnectionsPerListener.contains(listenerName)) { + val newListenerQuota = new ListenerConnectionQuota(counts, listenerName) + maxConnectionsPerListener.put(listenerName, newListenerQuota) + listenerCounts.put(listenerName, 0) + config.addReconfigurable(newListenerQuota) + } + counts.notifyAll() + } + } + + private[network] def removeListener(config: KafkaConfig, listenerName: ListenerName): Unit = { + counts.synchronized { + maxConnectionsPerListener.remove(listenerName).foreach { listenerQuota => + listenerCounts.remove(listenerName) + counts.notifyAll() // wake up any waiting acceptors to close cleanly + config.removeReconfigurable(listenerQuota) + } + } + } + + def dec(listenerName: ListenerName, address: InetAddress) { counts.synchronized { val count = counts.getOrElse(address, throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address")) @@ -1090,6 +1172,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { counts.remove(address) else counts.put(address, count - 1) + + if (totalCount <= 0) + error(s"Attempted to decrease total connection count for broker with no connections") + totalCount -= 1 + + if (maxConnectionsPerListener.contains(listenerName)) { + val listenerCount = listenerCounts(listenerName) + if (listenerCount == 0) + error(s"Attempted to decrease connection count for listener $listenerName with no connections") + else + listenerCounts.put(listenerName, listenerCount - 1) + } + counts.notifyAll() // wake up any acceptors waiting to process a new connection since listener connection limit was reached } } @@ -1097,6 +1192,72 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { counts.getOrElse(address, 0) } + private def waitForConnectionSlot(listenerName: ListenerName, + acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { + counts.synchronized { + if (!connectionSlotAvailable(listenerName)) { + val startNs = time.nanoseconds + do { + counts.wait() + } while (!connectionSlotAvailable(listenerName)) + acceptorBlockedPercentMeter.mark(time.nanoseconds - startNs) + } + } + } + + // This is invoked in every poll iteration and we close one LRU connection in an iteration + // if necessary + def maxConnectionsExceeded(listenerName: ListenerName): Boolean = { + totalCount > brokerMaxConnections && !protectedListener(listenerName) + } + + private def connectionSlotAvailable(listenerName: ListenerName): Boolean = { + if (listenerCounts(listenerName) >= maxListenerConnections(listenerName)) + false + else if (protectedListener(listenerName)) + true + else + totalCount < brokerMaxConnections + } + + private def protectedListener(listenerName: ListenerName): Boolean = + config.interBrokerListenerName == listenerName && config.listeners.size > 1 + + private def maxListenerConnections(listenerName: ListenerName): Int = + maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + + class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable { + @volatile private var _maxConnections = Int.MaxValue + private val listenerPropName = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}" + def maxConnections: Int = _maxConnections + + override def listenerName(): ListenerName = listener + + override def configure(configs: util.Map[String, _]): Unit = { + _maxConnections = maxConnections(configs) + } + + override def reconfigurableConfigs(): util.Set[String] = { + SocketServer.ListenerReconfigurableConfigs.asJava + } + + override def validateReconfiguration(configs: util.Map[String, _]): Unit = { + val value = maxConnections(configs) + if (value <= 0) + throw new ConfigException("Invalid max.connections $listenerMax") + } + + override def reconfigure(configs: util.Map[String, _]): Unit = { + lock.synchronized { + _maxConnections = maxConnections(configs) + lock.notifyAll() + } + } + + private def maxConnections(configs: util.Map[String, _]): Int = { + Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue) + } + } } class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)") diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index b02b842b954..4f27226265e 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.cluster.EndPoint import kafka.log.{LogCleaner, LogConfig, LogManager} +import kafka.network.SocketServer import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{AdminZkClient, KafkaZkClient} @@ -81,10 +82,11 @@ object DynamicBrokerConfig { DynamicThreadPool.ReconfigurableConfigs ++ Set(KafkaConfig.MetricReporterClassesProp) ++ DynamicListenerConfig.ReconfigurableConfigs ++ - DynamicConnectionQuota.ReconfigurableConfigs + SocketServer.ReconfigurableConfigs + private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp) private val PerBrokerConfigs = DynamicSecurityConfigs ++ - DynamicListenerConfig.ReconfigurableConfigs + DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp) private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) @@ -135,7 +137,13 @@ object DynamicBrokerConfig { private def perBrokerConfigs(props: Properties): Set[String] = { val configNames = props.asScala.keySet - configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty) + def perBrokerListenerConfig(name: String): Boolean = { + name match { + case ListenerConfigRegex(baseName) => !ClusterLevelListenerConfigs.contains(baseName) + case _ => false + } + } + configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig) } private def nonDynamicConfigs(props: Properties): Set[String] = { @@ -224,7 +232,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(kafkaServer.logManager.cleaner) addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) - addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer)) + addBrokerReconfigurable(kafkaServer.socketServer) } def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { @@ -789,7 +797,10 @@ object DynamicListenerConfig { KafkaConfig.SaslLoginRefreshWindowFactorProp, KafkaConfig.SaslLoginRefreshWindowJitterProp, KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp, - KafkaConfig.SaslLoginRefreshBufferSecondsProp + KafkaConfig.SaslLoginRefreshBufferSecondsProp, + + // Connection limit configs + KafkaConfig.MaxConnectionsProp ) } @@ -884,24 +895,5 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi } -object DynamicConnectionQuota { - val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp) -} - -class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable { - - override def reconfigurableConfigs: Set[String] = { - DynamicConnectionQuota.ReconfigurableConfigs - } - - override def validateReconfiguration(newConfig: KafkaConfig): Unit = { - } - - override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides) - if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) - server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp) - } -} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 02e171527e5..8ab7b437550 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -75,6 +75,7 @@ object Defaults { val SocketRequestMaxBytes: Int = 100 * 1024 * 1024 val MaxConnectionsPerIp: Int = Int.MaxValue val MaxConnectionsPerIpOverrides: String = "" + val MaxConnections: Int = Int.MaxValue val ConnectionsMaxIdleMs = 10 * 60 * 1000L val RequestTimeoutMs = 30000 val FailedAuthenticationDelayMs = 100 @@ -293,6 +294,7 @@ object KafkaConfig { val SocketRequestMaxBytesProp = "socket.request.max.bytes" val MaxConnectionsPerIpProp = "max.connections.per.ip" val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" + val MaxConnectionsProp = "max.connections" val ConnectionsMaxIdleMsProp = "connections.max.idle.ms" val FailedAuthenticationDelayMsProp = "connection.failed.authentication.delay.ms" /***************** rack configuration *************/ @@ -570,8 +572,15 @@ object KafkaConfig { val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used." val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " + - "configured using " + MaxConnectionsPerIpOverridesProp + " property" - val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is \"hostName:100,127.0.0.1:200\"" + s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached." + val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " + + "An example value is \"hostName:100,127.0.0.1:200\"" + val MaxConnectionsDoc = "The maximum number of connections we allow in the broker at any time. This limit is applied in addition " + + s"to any per-ip limits configured using $MaxConnectionsPerIpProp. Listener-level limits may also be configured by prefixing the " + + s"config name with the listener prefix, for example, listener.name.internal.$MaxConnectionsProp. Broker-wide limit " + + "should be configured based on broker capacity while listener limits should be configured based on application requirements. " + + "New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " + + "permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case." val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this" val FailedAuthenticationDelayMsDoc = "Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. " + s"This must be configured to be less than $ConnectionsMaxIdleMsProp to prevent connection timeout." @@ -872,6 +881,7 @@ object KafkaConfig { .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc) .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) + .define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc) .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc) .define(FailedAuthenticationDelayMsProp, INT, Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW, FailedAuthenticationDelayMsDoc) @@ -1163,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp) val maxConnectionsPerIpOverrides: Map[String, Int] = getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} + def maxConnections = getInt(KafkaConfig.MaxConnectionsProp) val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) val failedAuthenticationDelayMs = getInt(KafkaConfig.FailedAuthenticationDelayMsProp) @@ -1331,6 +1342,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO dynamicConfig.addReconfigurable(reconfigurable) } + def removeReconfigurable(reconfigurable: Reconfigurable): Unit = { + dynamicConfig.removeReconfigurable(reconfigurable) + } + def logRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 78d9af7bdfc..514d879a1d9 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -21,18 +21,19 @@ package kafka.network import java.io.IOException import java.net.{InetAddress, Socket} import java.util.Properties +import java.util.concurrent._ import kafka.server.{BaseRequestTest, KafkaConfig} -import kafka.utils.TestUtils +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.Assert.assertEquals -import org.junit.{Before, Test} +import org.junit.Assert._ +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -41,6 +42,9 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { override def numBrokers = 1 val topic = "test" + val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val localAddress = InetAddress.getByName("127.0.0.1") + var executor: ExecutorService = _ @Before override def setUp(): Unit = { @@ -48,76 +52,138 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers) } - @Test - def testDynamicConnectionQuota(): Unit = { - def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = { - new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0) + @After + override def tearDown(): Unit = { + try { + if (executor != null) { + executor.shutdownNow() + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)) + } + } finally { + super.tearDown() } + } - val socketServer = servers.head.socketServer - val localAddress = InetAddress.getByName("127.0.0.1") - def connectionCount = socketServer.connectionCount(localAddress) + override protected def propertyOverrides(properties: Properties): Unit = { + super.propertyOverrides(properties) + } + + @Test + def testDynamicConnectionQuota(): Unit = { val initialConnectionCount = connectionCount val maxConnectionsPerIP = 5 + def connectAndVerify: Unit = { + val socket = connect() + try { + sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) + } finally { + socket.close() + } + } + val props = new Properties props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString) reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)) - //wait for adminClient connections to close - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") - - //create connections up to maxConnectionsPerIP - 1, leave space for one connection - var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer)) - - // produce should succeed - var produceResponse = sendProduceRequest() - assertEquals(1, produceResponse.responses.size) - val (tp, partitionResponse) = produceResponse.responses.asScala.head - assertEquals(Errors.NONE, partitionResponse.error) - - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed") - conns = conns :+ connect(socketServer) - // now try one more (should fail) - intercept[IOException](sendProduceRequest()) - - conns.foreach(conn => conn.close()) - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + verifyMaxConnections(maxConnectionsPerIP, () => connectAndVerify) // Increase MaxConnectionsPerIpOverrides for localhost to 7 val maxConnectionsPerIPOverride = 7 props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")) - //wait for adminClient connections to close - TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + verifyMaxConnections(maxConnectionsPerIPOverride, () => connectAndVerify) + } - //create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection - conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer)) + @Test + def testDynamicListenerConnectionQuota(): Unit = { + val socketServer = servers.head.socketServer + val initialConnectionCount = connectionCount - // send should succeed - produceResponse = sendProduceRequest() - assertEquals(1, produceResponse.responses.size) - val (tp1, partitionResponse1) = produceResponse.responses.asScala.head - assertEquals(Errors.NONE, partitionResponse1.error) + def connectAndVerify(): Unit = { + val socket = connect("PLAINTEXT") + socket.setSoTimeout(1000) + try { + val response = sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) + assertEquals(0, response.remaining) + } finally { + socket.close() + } + } - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed") - conns = conns :+ connect(socketServer) - // now try one more (should fail) - intercept[IOException](sendProduceRequest()) + // Reduce total broker MaxConnections to 5 at the cluster level + val props = new Properties + props.put(KafkaConfig.MaxConnectionsProp, "5") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsProp, "5")) + verifyMaxConnections(5, () => connectAndVerify) - //close one connection - conns.head.close() - TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed") - // send should succeed - sendProduceRequest() + // Create another listener and verify listener connection limit of 5 for each listener + val newListeners = "PLAINTEXT://localhost:0,INTERNAL://localhost:0" + props.put(KafkaConfig.ListenersProp, newListeners) + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT") + props.put(KafkaConfig.MaxConnectionsProp, "10") + props.put("listener.name.internal.max.connections", "5") + props.put("listener.name.plaintext.max.connections", "5") + reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners)) + waitForListener("INTERNAL") + + var conns = (connectionCount until 5).map(_ => connect("PLAINTEXT")) + conns ++= (5 until 10).map(_ => connect("INTERNAL")) + conns.foreach(verifyConnection) + conns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") + + // Increase MaxConnections for PLAINTEXT listener to 7 at the broker level + val maxConnectionsPlaintext = 7 + val listenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}" + props.put(listenerProp, maxConnectionsPlaintext.toString) + reconfigureServers(props, perBrokerConfig = true, (listenerProp, maxConnectionsPlaintext.toString)) + verifyMaxConnections(maxConnectionsPlaintext, () => connectAndVerify) + + // Verify that connection blocked on the limit connects successfully when an existing connection is closed + val plaintextConnections = (connectionCount until maxConnectionsPlaintext).map(_ => connect("PLAINTEXT")) + executor = Executors.newSingleThreadExecutor + val future = executor.submit(CoreUtils.runnable { createAndVerifyConnection() }) + Thread.sleep(100) + assertFalse(future.isDone) + plaintextConnections.head.close() + future.get(30, TimeUnit.SECONDS) + plaintextConnections.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") + + // Verify that connections on inter-broker listener succeed even if broker max connections has been + // reached by closing connections on another listener + var plaintextConns = (connectionCount until 5).map(_ => connect("PLAINTEXT")) + val internalConns = (5 until 10).map(_ => connect("INTERNAL")) + plaintextConns.foreach(verifyConnection) + internalConns.foreach(verifyConnection) + plaintextConns ++= (0 until 2).map(_ => connect("PLAINTEXT")) + TestUtils.waitUntilTrue(() => connectionCount <= 10, "Internal connections not closed") + plaintextConns.foreach(verifyConnection) + intercept[IOException](internalConns.foreach { socket => sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) }) + plaintextConns.foreach(_.close()) + internalConns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") } private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = { + val initialConnectionCount = connectionCount val adminClient = createAdminClient() TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get() waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2) adminClient.close() + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Admin client connection not closed") + } + + private def waitForListener(listenerName: String): Unit = { + TestUtils.retry(maxWaitMs = 10000) { + try { + assertTrue(servers.head.socketServer.boundPort(ListenerName.normalised(listenerName)) > 0) + } catch { + case e: KafkaException => throw new AssertionError(e) + } + } } private def createAdminClient(): AdminClient = { @@ -135,12 +201,59 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } - private def sendProduceRequest(): ProduceResponse = { + private def produceRequest: ProduceRequest = { val topicPartition = new TopicPartition(topic, 0) val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) val partitionRecords = Map(topicPartition -> memoryRecords) - val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() - val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer) - ProduceResponse.parse(response, request.version) + ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() + } + + def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress) + + def connect(listener: String): Socket = { + val listenerName = ListenerName.normalised(listener) + new Socket("localhost", servers.head.socketServer.boundPort(listenerName)) + } + + private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = { + val socket = connect(listener) + try { + verifyConnection(socket) + } finally { + socket.close() + } + } + + private def verifyConnection(socket: Socket): Unit = { + val request = produceRequest + val response = sendAndReceive(request, ApiKeys.PRODUCE, socket) + val produceResponse = ProduceResponse.parse(response, request.version) + assertEquals(1, produceResponse.responses.size) + val (_, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(Errors.NONE, partitionResponse.error) + } + + private def verifyMaxConnections(maxConnections: Int, connectWithFailure: () => Unit): Unit = { + val initialConnectionCount = connectionCount + + //create connections up to maxConnectionsPerIP - 1, leave space for one connection + var conns = (connectionCount until (maxConnections - 1)).map(_ => connect("PLAINTEXT")) + + // produce should succeed on a new connection + createAndVerifyConnection() + + TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "produce request connection is not closed") + conns = conns :+ connect("PLAINTEXT") + + // now try one more (should fail) + intercept[IOException](connectWithFailure.apply()) + + //close one connection + conns.head.close() + TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "connection is not closed") + createAndVerifyConnection() + + conns.foreach(_.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed") } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 45ef18f5187..5d20da64cb6 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -186,6 +186,12 @@ class DynamicBrokerConfigTest extends JUnitSuite { //test invalid address verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true, expectFailure = true) + + verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = true, expectFailure = false) + verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = false, expectFailure = false) + val listenerMaxConnectionsProp = s"listener.name.external.${KafkaConfig.MaxConnectionsProp}" + verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = true, expectFailure = false) + verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = false, expectFailure = false) } private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {