Browse Source

KAFKA-1512 Fixes for limit the maximum number of connections per ip address patch by Jeff Holoman reviewed by Jay Krepps and Gwen Shapira

pull/38/merge
Joe Stein 10 years ago
parent
commit
f82518a850
  1. 2
      core/src/main/scala/kafka/network/SocketServer.scala
  2. 3
      core/src/main/scala/kafka/server/KafkaServer.scala
  3. 45
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala

2
core/src/main/scala/kafka/network/SocketServer.scala

@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int, @@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int,
val maxRequestSize: Int = Int.MaxValue,
val maxConnectionsPerIp: Int = Int.MaxValue,
val connectionsMaxIdleMs: Long,
val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup {
val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)

3
core/src/main/scala/kafka/server/KafkaServer.scala

@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes,
config.maxConnectionsPerIp,
config.connectionsMaxIdleMs)
config.connectionsMaxIdleMs,
config.maxConnectionsPerIpOverrides)
socketServer.startup()
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

45
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition @@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import java.nio.channels.SelectionKey
import kafka.utils.TestUtils
import scala.collection.Map
class SocketServerTest extends JUnitSuite {
@ -42,7 +43,8 @@ class SocketServerTest extends JUnitSuite { @@ -42,7 +43,8 @@ class SocketServerTest extends JUnitSuite {
recvBufferSize = 300000,
maxRequestSize = 50,
maxConnectionsPerIp = 5,
connectionsMaxIdleMs = 60*1000)
connectionsMaxIdleMs = 60*1000,
maxConnectionsPerIpOverrides = Map.empty[String,Int])
server.startup()
def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@ -71,13 +73,12 @@ class SocketServerTest extends JUnitSuite { @@ -71,13 +73,12 @@ class SocketServerTest extends JUnitSuite {
channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
}
def connect() = new Socket("localhost", server.port)
def connect(s:SocketServer = server) = new Socket("localhost", s.port)
@After
def cleanup() {
server.shutdown()
}
@Test
def simpleRequest() {
val socket = connect()
@ -141,19 +142,39 @@ class SocketServerTest extends JUnitSuite { @@ -141,19 +142,39 @@ class SocketServerTest extends JUnitSuite {
// doing a subsequent send should throw an exception as the connection should be closed.
sendRequest(socket, 0, bytes)
}
@Test
def testMaxConnectionsPerIp() {
// make the maximum allowable number of connections and then leak them
val conns = (0 until server.maxConnectionsPerIp).map(i => connect())
// now try one more (should fail)
try {
val conn = connect()
sendRequest(conn, 100, "hello".getBytes)
assertEquals(-1, conn.getInputStream().read())
} catch {
case e: IOException => // this is good
}
val conn = connect()
conn.setSoTimeout(3000)
assertEquals(-1, conn.getInputStream().read())
}
@Test
def testMaxConnectionsPerIPOverrides(): Unit = {
val overrideNum = 6
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
val overrideServer: SocketServer = new SocketServer(0,
host = null,
port = kafka.utils.TestUtils.choosePort,
numProcessorThreads = 1,
maxQueuedRequests = 50,
sendBufferSize = 300000,
recvBufferSize = 300000,
maxRequestSize = 50,
maxConnectionsPerIp = 5,
connectionsMaxIdleMs = 60*1000,
maxConnectionsPerIpOverrides = overrides)
overrideServer.startup()
// make the maximum allowable number of connections and then leak them
val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
// now try one more (should fail)
val conn = connect(overrideServer)
conn.setSoTimeout(3000)
assertEquals(-1, conn.getInputStream.read())
overrideServer.shutdown()
}
}

Loading…
Cancel
Save