Browse Source

kafka-1864; Revisit defaults for the internal offsets topic; patched by Jun Rao; reviewed by Jeol Koshy, Neha Narkhede, and Gwen Shapira

pull/38/merge
Jun Rao 10 years ago
parent
commit
5174df5377
  1. 11
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 6
      core/src/main/scala/kafka/server/KafkaConfig.scala
  3. 4
      core/src/main/scala/kafka/server/OffsetManager.scala
  4. 7
      core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  5. 7
      core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala

11
core/src/main/scala/kafka/server/KafkaApis.scala

@ -351,10 +351,17 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -351,10 +351,17 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
try {
if (topic == OffsetManager.OffsetsTopicName) {
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
.format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
}
else {
AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)

6
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -312,7 +312,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -312,7 +312,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size",
OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE))
/** The replication factor for the offset commit topic (set higher to ensure availability). */
/** The replication factor for the offsets topic (set higher to ensure availability). To
* ensure that the effective replication factor of the offsets topic is the configured value,
* the number of alive brokers has to be at least the replication factor at the time of the
* first request for the offsets topic. If not, either the offsets topic creation will fail or
* it will get a replication factor of min(alive brokers, configured replication factor) */
val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor",
OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue))

4
core/src/main/scala/kafka/server/OffsetManager.scala

@ -75,9 +75,9 @@ object OffsetManagerConfig { @@ -75,9 +75,9 @@ object OffsetManagerConfig {
val DefaultMaxMetadataSize = 4096
val DefaultLoadBufferSize = 5*1024*1024
val DefaultOffsetsRetentionCheckIntervalMs = 600000L
val DefaultOffsetsTopicNumPartitions = 1
val DefaultOffsetsTopicNumPartitions = 50
val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
val DefaultOffsetsTopicReplicationFactor = 1.toShort
val DefaultOffsetsTopicReplicationFactor = 3.toShort
val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
val DefaultOffsetCommitTimeoutMs = 5000
val DefaultOffsetCommitRequiredAcks = (-1).toShort

7
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala

@ -46,6 +46,13 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -46,6 +46,13 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
override val zkConnect = TestZKUtils.zookeeperConnect
override val autoCreateTopicsEnable = false
override val messageMaxBytes = serverMessageMaxBytes
// TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
// the broker. As a result, the live broker list in metadataCache is empty. If the number of live brokers is 0, we
// try to create the offset topic with the default offsets.topic.replication.factor of 3. The creation will fail
// since there is not enough live brokers. This causes testCannotSendToInternalTopic() to fail. Temporarily fixing
// the issue by overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we need to
// remove the following config override.
override val offsetsTopicReplicationFactor = 1.asInstanceOf[Short]
}

7
core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala

@ -46,6 +46,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -46,6 +46,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
override def setUp() {
super.setUp()
val config: Properties = createBrokerConfig(1, brokerPort)
// TODO: Currently, when there is no topic in a cluster, the controller doesn't send any UpdateMetadataRequest to
// the broker. As a result, the live broker list in metadataCache is empty. This causes the ConsumerMetadataRequest
// to fail since if the number of live brokers is 0, we try to create the offset topic with the default
// offsets.topic.replication.factor of 3. The creation will fail since there is not enough live brokers. In order
// for the unit test to pass, overriding offsets.topic.replication.factor to 1 for now. When we fix KAFKA-1867, we
// need to remove the following config override.
config.put("offsets.topic.replication.factor", "1")
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
time = new MockTime()

Loading…
Cancel
Save