Browse Source

KAFKA-724; Allow automatic socket.send.buffer from operating system in SocketServer

If socket.receive.buffer.bytes/socket.send.buffer.bytes are set to -1, use the OS defaults.

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1469 from rekhajoshm/KAFKA-724-rebased
pull/1330/head
Joshi 9 years ago committed by Ismael Juma
parent
commit
430bf56cdf
  1. 4
      clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
  2. 9
      core/src/main/scala/kafka/network/SocketServer.scala
  3. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala

4
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

@ -43,10 +43,10 @@ public class CommonClientConfigs { @@ -43,10 +43,10 @@ public class CommonClientConfigs {
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";
public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";

9
core/src/main/scala/kafka/network/SocketServer.scala

@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup @@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selector => KSelector}
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selectable, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
@ -304,7 +304,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -304,7 +304,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
new InetSocketAddress(host, port)
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
serverChannel.socket().setReceiveBufferSize(recvBufferSize)
try {
serverChannel.socket.bind(socketAddress)
info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
@ -326,7 +328,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -326,7 +328,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -396,8 +396,8 @@ object KafkaConfig { @@ -396,8 +396,8 @@ object KafkaConfig {
val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
" If this is not set, the value for `listeners` will be used."
val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets"
val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets"
val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address"
val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"

Loading…
Cancel
Save