Browse Source

kafka-1562; kafka-topics.sh alter add partitions resets cleanup.policy; patched by Jonathan Natkins; reviewed by Jun Rao

pull/29/merge
Jonathan Natkins 10 years ago committed by Jun Rao
parent
commit
f8d521a961
  1. 10
      core/src/main/scala/kafka/admin/AdminUtils.scala
  2. 4
      core/src/main/scala/kafka/admin/TopicCommand.scala
  3. 63
      core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

10
core/src/main/scala/kafka/admin/AdminUtils.scala

@ -96,8 +96,14 @@ object AdminUtils extends Logging { @@ -96,8 +96,14 @@ object AdminUtils extends Logging {
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
* @param config Pre-existing properties that should be preserved
*/
def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) {
def addPartitions(zkClient: ZkClient,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true,
config: Properties = new Properties) {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@ -124,7 +130,7 @@ object AdminUtils extends Logging { @@ -124,7 +130,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {

4
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -95,11 +95,11 @@ object TopicCommand { @@ -95,11 +95,11 @@ object TopicCommand {
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
topics.foreach { topic =>
val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
// compile the final set of configs
val configs = AdminUtils.fetchTopicConfig(zkClient, topic)
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
AdminUtils.changeTopicConfig(zkClient, topic, configs)
@ -113,7 +113,7 @@ object TopicCommand { @@ -113,7 +113,7 @@ object TopicCommand {
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs)
println("Adding partitions succeeded!")
}
}

63
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

@ -0,0 +1,63 @@ @@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@Test
def testConfigPreservationAcrossPartitionAlteration() {
val topic = "test"
val numPartitionsOriginal = 1
val cleanupKey = "cleanup.policy"
val cleanupVal = "compact"
// create brokers
val brokers = List(0, 1, 2)
TestUtils.createBrokersInZk(zkClient, brokers)
// create the topic
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
TopicCommand.createTopic(zkClient, createOpts)
val props = AdminUtils.fetchTopicConfig(zkClient, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
// pre-create the topic config changes path to avoid a NoNodeException
ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath)
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString,
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
TopicCommand.alterTopic(zkClient, alterOpts)
val newProps = AdminUtils.fetchTopicConfig(zkClient, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
}
Loading…
Cancel
Save