Browse Source

KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402)

Adds a new listener config `max.connections` to limit the number of active connections on each listener. The config may be prefixed with listener prefix. This limit may be dynamically reconfigured without restarting the broker.

This is one of the PRs for KIP-402 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors). Note that this is currently built on top of PR #6022

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Gwen Shapira <cshapi@gmail.com>

Closes #6034 from rajinisivaram/KAFKA-7730-max-connections
pull/6419/head
Rajini Sivaram 6 years ago committed by Gwen Shapira
parent
commit
3ec2ca5e33
  1. 22
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  2. 30
      clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
  3. 209
      core/src/main/scala/kafka/network/SocketServer.scala
  4. 40
      core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  5. 19
      core/src/main/scala/kafka/server/KafkaConfig.scala
  6. 217
      core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
  7. 6
      core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

22
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -919,6 +919,28 @@ public class Selector implements Selectable, AutoCloseable { @@ -919,6 +919,28 @@ public class Selector implements Selectable, AutoCloseable {
return closingChannels.get(id);
}
/**
* Returns the lowest priority channel chosen using the following sequence:
* 1) If one or more channels are in closing state, return any one of them
* 2) If idle expiry manager is enabled, return the least recently updated channel
* 3) Otherwise return any of the channels
*
* This method is used to close a channel to accommodate a new channel on the inter-broker listener
* when broker-wide `max.connections` limit is enabled.
*/
public KafkaChannel lowestPriorityChannel() {
KafkaChannel channel = null;
if (!closingChannels.isEmpty()) {
channel = closingChannels.values().iterator().next();
} else if (idleExpiryManager != null && !idleExpiryManager.lruConnections.isEmpty()) {
String channelId = idleExpiryManager.lruConnections.keySet().iterator().next();
channel = channel(channelId);
} else if (!channels.isEmpty()) {
channel = channels.values().iterator().next();
}
return channel;
}
/**
* Get the channel associated with selectionKey
*/

30
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java

@ -688,6 +688,36 @@ public class SelectorTest { @@ -688,6 +688,36 @@ public class SelectorTest {
assertEquals((double) conns, getMetric("connection-count").metricValue());
}
@SuppressWarnings("unchecked")
@Test
public void testLowestPriorityChannel() throws Exception {
int conns = 5;
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
for (int i = 0; i < conns; i++) {
connect(String.valueOf(i), addr);
}
assertNotNull(selector.lowestPriorityChannel());
for (int i = conns - 1; i >= 0; i--) {
if (i != 2)
assertEquals("", blockingRequest(String.valueOf(i), ""));
time.sleep(10);
}
assertEquals("2", selector.lowestPriorityChannel().id());
Field field = Selector.class.getDeclaredField("closingChannels");
field.setAccessible(true);
Map<String, KafkaChannel> closingChannels = (Map<String, KafkaChannel>) field.get(selector);
closingChannels.put("3", selector.channel("3"));
assertEquals("3", selector.lowestPriorityChannel().id());
closingChannels.remove("3");
for (int i = 0; i < conns; i++) {
selector.close(String.valueOf(i));
}
assertNull(selector.lowestPriorityChannel());
}
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);

209
core/src/main/scala/kafka/network/SocketServer.scala

@ -21,6 +21,7 @@ import java.io.IOException @@ -21,6 +21,7 @@ import java.io.IOException
import java.net._
import java.nio.channels._
import java.nio.channels.{Selector => NSelector}
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.function.Supplier
@ -32,15 +33,16 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingRespo @@ -32,15 +33,16 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingRespo
import kafka.network.Processor._
import kafka.network.SocketServer._
import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Meter
import org.apache.kafka.common.metrics.stats.Total
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
@ -70,7 +72,11 @@ import scala.util.control.ControlThrowable @@ -70,7 +72,11 @@ import scala.util.control.ControlThrowable
* Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
* 1 Handler thread that handles requests and produce responses back to the processor thread for writing.
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
@ -109,7 +115,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -109,7 +115,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
*/
def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
connectionQuotas = new ConnectionQuotas(config, time)
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
if (startupProcessors) {
@ -212,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -212,6 +218,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
endpoints.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
@ -223,6 +230,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -223,6 +230,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
endpointOpt.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
@ -324,18 +332,33 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -324,18 +332,33 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
info(s"Removing data-plane listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
connectionQuotas.removeListener(config, endpoint.listenerName)
dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
}
}
def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
override def reconfigurableConfigs: Set[String] = SocketServer.ReconfigurableConfigs
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
}
def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = {
info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val maxConnectionsPerIp = newConfig.maxConnectionsPerIp
if (maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) {
info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
}
val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides
if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides) {
info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
}
val maxConnections = newConfig.maxConnections
if (maxConnections != oldConfig.maxConnections) {
info(s"Updating broker-wide maxConnections: $maxConnections")
connectionQuotas.updateBrokerMaxConnections(maxConnections)
}
}
// `protected` for test usage
@ -373,6 +396,13 @@ object SocketServer { @@ -373,6 +396,13 @@ object SocketServer {
val ControlPlaneThreadPrefix = "control-plane"
val DataPlaneMetricPrefix = ""
val ControlPlaneMetricPrefix = "ControlPlane"
val ReconfigurableConfigs = Set(
KafkaConfig.MaxConnectionsPerIpProp,
KafkaConfig.MaxConnectionsPerIpOverridesProp,
KafkaConfig.MaxConnectionsProp)
val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp)
}
/**
@ -427,10 +457,10 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ @@ -427,10 +457,10 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
/**
* Close `channel` and decrement the connection count.
*/
def close(channel: SocketChannel): Unit = {
def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
if (channel != null) {
debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
connectionQuotas.dec(channel.socket.getInetAddress)
connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
CoreUtils.swallow(channel.close(), this, Level.ERROR)
}
@ -500,6 +530,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -500,6 +530,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
var currentProcessorIndex = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
@ -508,6 +539,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -508,6 +539,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
@ -582,7 +614,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -582,7 +614,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
@ -592,7 +624,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -592,7 +624,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
close(socketChannel)
close(endPoint.listenerName, socketChannel)
None
}
}
@ -731,6 +763,7 @@ private[kafka] class Processor(val id: Int, @@ -731,6 +763,7 @@ private[kafka] class Processor(val id: Int,
processCompletedReceives()
processCompletedSends()
processDisconnected()
closeExcessConnections()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
@ -911,13 +944,21 @@ private[kafka] class Processor(val id: Int, @@ -911,13 +944,21 @@ private[kafka] class Processor(val id: Int,
}.remoteHost
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
} catch {
case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
}
}
}
private def closeExcessConnections(): Unit = {
if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
val channel = selector.lowestPriorityChannel()
if (channel != null)
close(channel.id)
}
}
/**
* Close the connection identified by `connectionId` and decrement the connection count.
* The channel will be immediately removed from the selector's `channels` or `closingChannels`
@ -930,7 +971,7 @@ private[kafka] class Processor(val id: Int, @@ -930,7 +971,7 @@ private[kafka] class Processor(val id: Int,
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
if (address != null)
connectionQuotas.dec(address)
connectionQuotas.dec(listenerName, address)
selector.close(connectionId)
inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response))
@ -977,7 +1018,7 @@ private[kafka] class Processor(val id: Int, @@ -977,7 +1018,7 @@ private[kafka] class Processor(val id: Int,
case e: Throwable =>
val remoteAddress = channel.socket.getRemoteSocketAddress
// need to close the channel here to avoid a socket leak.
close(channel)
close(listenerName, channel)
processException(s"Processor $id closed connection from $remoteAddress", e)
}
}
@ -1058,31 +1099,72 @@ private[kafka] class Processor(val id: Int, @@ -1058,31 +1099,72 @@ private[kafka] class Processor(val id: Int,
}
class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
@volatile private var defaultMaxConnectionsPerIp = defaultMax
@volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
@volatile private var brokerMaxConnections = config.maxConnections
private val counts = mutable.Map[InetAddress, Int]()
def inc(address: InetAddress) {
// Listener counts and configs are synchronized on `counts`
private val listenerCounts = mutable.Map[ListenerName, Int]()
private val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
@volatile private var totalCount = 0
def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter) {
counts.synchronized {
waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
val count = counts.getOrElseUpdate(address, 0)
counts.put(address, count + 1)
totalCount += 1
if (listenerCounts.contains(listenerName)) {
listenerCounts.put(listenerName, listenerCounts(listenerName) + 1)
}
val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp)
if (count >= max)
throw new TooManyConnectionsException(address, max)
}
}
def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
private[network] def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
defaultMaxConnectionsPerIp = maxConnectionsPerIp
}
def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
}
def dec(address: InetAddress) {
private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = {
counts.synchronized {
brokerMaxConnections = maxConnections
counts.notifyAll()
}
}
private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = {
counts.synchronized {
if (!maxConnectionsPerListener.contains(listenerName)) {
val newListenerQuota = new ListenerConnectionQuota(counts, listenerName)
maxConnectionsPerListener.put(listenerName, newListenerQuota)
listenerCounts.put(listenerName, 0)
config.addReconfigurable(newListenerQuota)
}
counts.notifyAll()
}
}
private[network] def removeListener(config: KafkaConfig, listenerName: ListenerName): Unit = {
counts.synchronized {
maxConnectionsPerListener.remove(listenerName).foreach { listenerQuota =>
listenerCounts.remove(listenerName)
counts.notifyAll() // wake up any waiting acceptors to close cleanly
config.removeReconfigurable(listenerQuota)
}
}
}
def dec(listenerName: ListenerName, address: InetAddress) {
counts.synchronized {
val count = counts.getOrElse(address,
throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address"))
@ -1090,6 +1172,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { @@ -1090,6 +1172,19 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
counts.remove(address)
else
counts.put(address, count - 1)
if (totalCount <= 0)
error(s"Attempted to decrease total connection count for broker with no connections")
totalCount -= 1
if (maxConnectionsPerListener.contains(listenerName)) {
val listenerCount = listenerCounts(listenerName)
if (listenerCount == 0)
error(s"Attempted to decrease connection count for listener $listenerName with no connections")
else
listenerCounts.put(listenerName, listenerCount - 1)
}
counts.notifyAll() // wake up any acceptors waiting to process a new connection since listener connection limit was reached
}
}
@ -1097,6 +1192,72 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { @@ -1097,6 +1192,72 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
counts.getOrElse(address, 0)
}
private def waitForConnectionSlot(listenerName: ListenerName,
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
counts.synchronized {
if (!connectionSlotAvailable(listenerName)) {
val startNs = time.nanoseconds
do {
counts.wait()
} while (!connectionSlotAvailable(listenerName))
acceptorBlockedPercentMeter.mark(time.nanoseconds - startNs)
}
}
}
// This is invoked in every poll iteration and we close one LRU connection in an iteration
// if necessary
def maxConnectionsExceeded(listenerName: ListenerName): Boolean = {
totalCount > brokerMaxConnections && !protectedListener(listenerName)
}
private def connectionSlotAvailable(listenerName: ListenerName): Boolean = {
if (listenerCounts(listenerName) >= maxListenerConnections(listenerName))
false
else if (protectedListener(listenerName))
true
else
totalCount < brokerMaxConnections
}
private def protectedListener(listenerName: ListenerName): Boolean =
config.interBrokerListenerName == listenerName && config.listeners.size > 1
private def maxListenerConnections(listenerName: ListenerName): Int =
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable {
@volatile private var _maxConnections = Int.MaxValue
private val listenerPropName = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}"
def maxConnections: Int = _maxConnections
override def listenerName(): ListenerName = listener
override def configure(configs: util.Map[String, _]): Unit = {
_maxConnections = maxConnections(configs)
}
override def reconfigurableConfigs(): util.Set[String] = {
SocketServer.ListenerReconfigurableConfigs.asJava
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val value = maxConnections(configs)
if (value <= 0)
throw new ConfigException("Invalid max.connections $listenerMax")
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
lock.synchronized {
_maxConnections = maxConnections(configs)
lock.notifyAll()
}
}
private def maxConnections(configs: util.Map[String, _]): Int = {
Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue)
}
}
}
class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)")

40
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
import kafka.network.SocketServer
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{AdminZkClient, KafkaZkClient}
@ -81,10 +82,11 @@ object DynamicBrokerConfig { @@ -81,10 +82,11 @@ object DynamicBrokerConfig {
DynamicThreadPool.ReconfigurableConfigs ++
Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs ++
DynamicConnectionQuota.ReconfigurableConfigs
SocketServer.ReconfigurableConfigs
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs
DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
@ -135,7 +137,13 @@ object DynamicBrokerConfig { @@ -135,7 +137,13 @@ object DynamicBrokerConfig {
private def perBrokerConfigs(props: Properties): Set[String] = {
val configNames = props.asScala.keySet
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
def perBrokerListenerConfig(name: String): Boolean = {
name match {
case ListenerConfigRegex(baseName) => !ClusterLevelListenerConfigs.contains(baseName)
case _ => false
}
}
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig)
}
private def nonDynamicConfigs(props: Properties): Set[String] = {
@ -224,7 +232,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -224,7 +232,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@ -789,7 +797,10 @@ object DynamicListenerConfig { @@ -789,7 +797,10 @@ object DynamicListenerConfig {
KafkaConfig.SaslLoginRefreshWindowFactorProp,
KafkaConfig.SaslLoginRefreshWindowJitterProp,
KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp,
KafkaConfig.SaslLoginRefreshBufferSecondsProp
KafkaConfig.SaslLoginRefreshBufferSecondsProp,
// Connection limit configs
KafkaConfig.MaxConnectionsProp
)
}
@ -884,24 +895,5 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi @@ -884,24 +895,5 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
}
object DynamicConnectionQuota {
val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp)
}
class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable {
override def reconfigurableConfigs: Set[String] = {
DynamicConnectionQuota.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides)
if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp)
server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp)
}
}

19
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -75,6 +75,7 @@ object Defaults { @@ -75,6 +75,7 @@ object Defaults {
val SocketRequestMaxBytes: Int = 100 * 1024 * 1024
val MaxConnectionsPerIp: Int = Int.MaxValue
val MaxConnectionsPerIpOverrides: String = ""
val MaxConnections: Int = Int.MaxValue
val ConnectionsMaxIdleMs = 10 * 60 * 1000L
val RequestTimeoutMs = 30000
val FailedAuthenticationDelayMs = 100
@ -293,6 +294,7 @@ object KafkaConfig { @@ -293,6 +294,7 @@ object KafkaConfig {
val SocketRequestMaxBytesProp = "socket.request.max.bytes"
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val MaxConnectionsProp = "max.connections"
val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
val FailedAuthenticationDelayMsProp = "connection.failed.authentication.delay.ms"
/***************** rack configuration *************/
@ -570,8 +572,15 @@ object KafkaConfig { @@ -570,8 +572,15 @@ object KafkaConfig {
val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used."
val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " +
"configured using " + MaxConnectionsPerIpOverridesProp + " property"
val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is \"hostName:100,127.0.0.1:200\""
s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached."
val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " +
"An example value is \"hostName:100,127.0.0.1:200\""
val MaxConnectionsDoc = "The maximum number of connections we allow in the broker at any time. This limit is applied in addition " +
s"to any per-ip limits configured using $MaxConnectionsPerIpProp. Listener-level limits may also be configured by prefixing the " +
s"config name with the listener prefix, for example, <code>listener.name.internal.$MaxConnectionsProp</code>. Broker-wide limit " +
"should be configured based on broker capacity while listener limits should be configured based on application requirements. " +
"New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " +
"permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case."
val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this"
val FailedAuthenticationDelayMsDoc = "Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. " +
s"This must be configured to be less than $ConnectionsMaxIdleMsProp to prevent connection timeout."
@ -872,6 +881,7 @@ object KafkaConfig { @@ -872,6 +881,7 @@ object KafkaConfig {
.define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
.define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
.define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc)
.define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
.define(FailedAuthenticationDelayMsProp, INT, Defaults.FailedAuthenticationDelayMs, atLeast(0), LOW, FailedAuthenticationDelayMsDoc)
@ -1163,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1163,6 +1173,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
val maxConnectionsPerIpOverrides: Map[String, Int] =
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
def maxConnections = getInt(KafkaConfig.MaxConnectionsProp)
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
val failedAuthenticationDelayMs = getInt(KafkaConfig.FailedAuthenticationDelayMsProp)
@ -1331,6 +1342,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1331,6 +1342,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
dynamicConfig.addReconfigurable(reconfigurable)
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.removeReconfigurable(reconfigurable)
}
def logRetentionTimeMillis: Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute

217
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala

@ -21,18 +21,19 @@ package kafka.network @@ -21,18 +21,19 @@ package kafka.network
import java.io.IOException
import java.net.{InetAddress, Socket}
import java.util.Properties
import java.util.concurrent._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert.assertEquals
import org.junit.{Before, Test}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
@ -41,6 +42,9 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { @@ -41,6 +42,9 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
override def numBrokers = 1
val topic = "test"
val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val localAddress = InetAddress.getByName("127.0.0.1")
var executor: ExecutorService = _
@Before
override def setUp(): Unit = {
@ -48,76 +52,138 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { @@ -48,76 +52,138 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
}
@Test
def testDynamicConnectionQuota(): Unit = {
def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
@After
override def tearDown(): Unit = {
try {
if (executor != null) {
executor.shutdownNow()
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS))
}
} finally {
super.tearDown()
}
}
val socketServer = servers.head.socketServer
val localAddress = InetAddress.getByName("127.0.0.1")
def connectionCount = socketServer.connectionCount(localAddress)
override protected def propertyOverrides(properties: Properties): Unit = {
super.propertyOverrides(properties)
}
@Test
def testDynamicConnectionQuota(): Unit = {
val initialConnectionCount = connectionCount
val maxConnectionsPerIP = 5
def connectAndVerify: Unit = {
val socket = connect()
try {
sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket)
} finally {
socket.close()
}
}
val props = new Properties
props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString))
//wait for adminClient connections to close
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
//create connections up to maxConnectionsPerIP - 1, leave space for one connection
var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer))
// produce should succeed
var produceResponse = sendProduceRequest()
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(Errors.NONE, partitionResponse.error)
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIP - 1), "produce request connection is not closed")
conns = conns :+ connect(socketServer)
// now try one more (should fail)
intercept[IOException](sendProduceRequest())
conns.foreach(conn => conn.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
verifyMaxConnections(maxConnectionsPerIP, () => connectAndVerify)
// Increase MaxConnectionsPerIpOverrides for localhost to 7
val maxConnectionsPerIPOverride = 7
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride"))
//wait for adminClient connections to close
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
verifyMaxConnections(maxConnectionsPerIPOverride, () => connectAndVerify)
}
//create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection
conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer))
@Test
def testDynamicListenerConnectionQuota(): Unit = {
val socketServer = servers.head.socketServer
val initialConnectionCount = connectionCount
// send should succeed
produceResponse = sendProduceRequest()
assertEquals(1, produceResponse.responses.size)
val (tp1, partitionResponse1) = produceResponse.responses.asScala.head
assertEquals(Errors.NONE, partitionResponse1.error)
def connectAndVerify(): Unit = {
val socket = connect("PLAINTEXT")
socket.setSoTimeout(1000)
try {
val response = sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket)
assertEquals(0, response.remaining)
} finally {
socket.close()
}
}
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "produce request connection is not closed")
conns = conns :+ connect(socketServer)
// now try one more (should fail)
intercept[IOException](sendProduceRequest())
// Reduce total broker MaxConnections to 5 at the cluster level
val props = new Properties
props.put(KafkaConfig.MaxConnectionsProp, "5")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsProp, "5"))
verifyMaxConnections(5, () => connectAndVerify)
//close one connection
conns.head.close()
TestUtils.waitUntilTrue(() => connectionCount == (maxConnectionsPerIPOverride - 1), "connection is not closed")
// send should succeed
sendProduceRequest()
// Create another listener and verify listener connection limit of 5 for each listener
val newListeners = "PLAINTEXT://localhost:0,INTERNAL://localhost:0"
props.put(KafkaConfig.ListenersProp, newListeners)
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT")
props.put(KafkaConfig.MaxConnectionsProp, "10")
props.put("listener.name.internal.max.connections", "5")
props.put("listener.name.plaintext.max.connections", "5")
reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners))
waitForListener("INTERNAL")
var conns = (connectionCount until 5).map(_ => connect("PLAINTEXT"))
conns ++= (5 until 10).map(_ => connect("INTERNAL"))
conns.foreach(verifyConnection)
conns.foreach(_.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
// Increase MaxConnections for PLAINTEXT listener to 7 at the broker level
val maxConnectionsPlaintext = 7
val listenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionsProp}"
props.put(listenerProp, maxConnectionsPlaintext.toString)
reconfigureServers(props, perBrokerConfig = true, (listenerProp, maxConnectionsPlaintext.toString))
verifyMaxConnections(maxConnectionsPlaintext, () => connectAndVerify)
// Verify that connection blocked on the limit connects successfully when an existing connection is closed
val plaintextConnections = (connectionCount until maxConnectionsPlaintext).map(_ => connect("PLAINTEXT"))
executor = Executors.newSingleThreadExecutor
val future = executor.submit(CoreUtils.runnable { createAndVerifyConnection() })
Thread.sleep(100)
assertFalse(future.isDone)
plaintextConnections.head.close()
future.get(30, TimeUnit.SECONDS)
plaintextConnections.foreach(_.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
// Verify that connections on inter-broker listener succeed even if broker max connections has been
// reached by closing connections on another listener
var plaintextConns = (connectionCount until 5).map(_ => connect("PLAINTEXT"))
val internalConns = (5 until 10).map(_ => connect("INTERNAL"))
plaintextConns.foreach(verifyConnection)
internalConns.foreach(verifyConnection)
plaintextConns ++= (0 until 2).map(_ => connect("PLAINTEXT"))
TestUtils.waitUntilTrue(() => connectionCount <= 10, "Internal connections not closed")
plaintextConns.foreach(verifyConnection)
intercept[IOException](internalConns.foreach { socket => sendAndReceive(produceRequest, ApiKeys.PRODUCE, socket) })
plaintextConns.foreach(_.close())
internalConns.foreach(_.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
}
private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = {
val initialConnectionCount = connectionCount
val adminClient = createAdminClient()
TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get()
waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2)
adminClient.close()
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Admin client connection not closed")
}
private def waitForListener(listenerName: String): Unit = {
TestUtils.retry(maxWaitMs = 10000) {
try {
assertTrue(servers.head.socketServer.boundPort(ListenerName.normalised(listenerName)) > 0)
} catch {
case e: KafkaException => throw new AssertionError(e)
}
}
}
private def createAdminClient(): AdminClient = {
@ -135,12 +201,59 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { @@ -135,12 +201,59 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
}
private def sendProduceRequest(): ProduceResponse = {
private def produceRequest: ProduceRequest = {
val topicPartition = new TopicPartition(topic, 0)
val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val partitionRecords = Map(topicPartition -> memoryRecords)
val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer)
ProduceResponse.parse(response, request.version)
ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
}
def connectionCount: Int = servers.head.socketServer.connectionCount(localAddress)
def connect(listener: String): Socket = {
val listenerName = ListenerName.normalised(listener)
new Socket("localhost", servers.head.socketServer.boundPort(listenerName))
}
private def createAndVerifyConnection(listener: String = "PLAINTEXT"): Unit = {
val socket = connect(listener)
try {
verifyConnection(socket)
} finally {
socket.close()
}
}
private def verifyConnection(socket: Socket): Unit = {
val request = produceRequest
val response = sendAndReceive(request, ApiKeys.PRODUCE, socket)
val produceResponse = ProduceResponse.parse(response, request.version)
assertEquals(1, produceResponse.responses.size)
val (_, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(Errors.NONE, partitionResponse.error)
}
private def verifyMaxConnections(maxConnections: Int, connectWithFailure: () => Unit): Unit = {
val initialConnectionCount = connectionCount
//create connections up to maxConnectionsPerIP - 1, leave space for one connection
var conns = (connectionCount until (maxConnections - 1)).map(_ => connect("PLAINTEXT"))
// produce should succeed on a new connection
createAndVerifyConnection()
TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "produce request connection is not closed")
conns = conns :+ connect("PLAINTEXT")
// now try one more (should fail)
intercept[IOException](connectWithFailure.apply())
//close one connection
conns.head.close()
TestUtils.waitUntilTrue(() => connectionCount == (maxConnections - 1), "connection is not closed")
createAndVerifyConnection()
conns.foreach(_.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
}
}

6
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

@ -186,6 +186,12 @@ class DynamicBrokerConfigTest extends JUnitSuite { @@ -186,6 +186,12 @@ class DynamicBrokerConfigTest extends JUnitSuite {
//test invalid address
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true,
expectFailure = true)
verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = true, expectFailure = false)
verifyConfigUpdate(KafkaConfig.MaxConnectionsProp, "100", perBrokerConfig = false, expectFailure = false)
val listenerMaxConnectionsProp = s"listener.name.external.${KafkaConfig.MaxConnectionsProp}"
verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = true, expectFailure = false)
verifyConfigUpdate(listenerMaxConnectionsProp, "10", perBrokerConfig = false, expectFailure = false)
}
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {

Loading…
Cancel
Save