Browse Source

KAFKA-991; Reduce the queue size in hadoop producer; patched by Swapnil Ghike, reviewed by Jay Kreps and Joel Koshy.

pull/6/merge
Joel Koshy 11 years ago
parent
commit
8edd3e6302
  1. 7
      contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
  2. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala

7
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java

@ -40,8 +40,11 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V> @@ -40,8 +40,11 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
private Logger log = Logger.getLogger(KafkaOutputFormat.class);
public static final String KAFKA_URL = "kafka.output.url";
/** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window) */
public static final int KAFKA_QUEUE_SIZE = 10*1024*1024;
/** Bytes to buffer before the OutputFormat does a send (i.e., the amortization window):
* We set the default to a million bytes so that the server will not reject the batch of messages
* with a MessageSizeTooLargeException. The actual size will be smaller after compression.
*/
public static final int KAFKA_QUEUE_SIZE = 1000000;
public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
private static final Map<String, String> kafkaConfigMap;

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.server
import java.util.Properties
import kafka.message.Message
import kafka.message.{MessageSet, Message}
import kafka.consumer.ConsumerConfig
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -38,7 +38,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* the maximum size of message that the server can receive */
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))

Loading…
Cancel
Save