From f8d521a9616134d78966419e5cf2aa73e8d6a5c5 Mon Sep 17 00:00:00 2001 From: Jonathan Natkins Date: Mon, 4 Aug 2014 07:21:25 -0700 Subject: [PATCH] kafka-1562; kafka-topics.sh alter add partitions resets cleanup.policy; patched by Jonathan Natkins; reviewed by Jun Rao --- .../main/scala/kafka/admin/AdminUtils.scala | 10 ++- .../main/scala/kafka/admin/TopicCommand.scala | 4 +- .../unit/kafka/admin/TopicCommandTest.scala | 63 +++++++++++++++++++ 3 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index b5d8714e964..94c53320b76 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -96,8 +96,14 @@ object AdminUtils extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignmentStr Manual replica assignment * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing + * @param config Pre-existing properties that should be preserved */ - def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) { + def addPartitions(zkClient: ZkClient, + topic: String, + numPartitions: Int = 1, + replicaAssignmentStr: String = "", + checkBrokerAvailable: Boolean = true, + config: Properties = new Properties) { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) @@ -124,7 +130,7 @@ object AdminUtils extends Logging { val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list partitionReplicaList ++= newPartitionReplicaList - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = { diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8d5c2e7088f..003a09c6160 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -95,11 +95,11 @@ object TopicCommand { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) topics.foreach { topic => + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) // compile the final set of configs - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) configs.putAll(configsToBeAdded) configsToBeDeleted.foreach(config => configs.remove(config)) AdminUtils.changeTopicConfig(zkClient, topic, configs) @@ -113,7 +113,7 @@ object TopicCommand { "logic or ordering of the messages will be affected") val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) - AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs) println("Adding partitions succeeded!") } } diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala new file mode 100644 index 00000000000..ac6dd2087de --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import junit.framework.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import kafka.utils.Logging +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.server.KafkaConfig +import kafka.admin.TopicCommand.TopicCommandOptions +import kafka.utils.ZkUtils + +class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { + + @Test + def testConfigPreservationAcrossPartitionAlteration() { + val topic = "test" + val numPartitionsOriginal = 1 + val cleanupKey = "cleanup.policy" + val cleanupVal = "compact" + // create brokers + val brokers = List(0, 1, 2) + TestUtils.createBrokersInZk(zkClient, brokers) + // create the topic + val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, + "--replication-factor", "1", + "--config", cleanupKey + "=" + cleanupVal, + "--topic", topic)) + TopicCommand.createTopic(zkClient, createOpts) + val props = AdminUtils.fetchTopicConfig(zkClient, topic) + assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey)) + assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) + + // pre-create the topic config changes path to avoid a NoNodeException + ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath) + + // modify the topic to add new partitions + val numPartitionsModified = 3 + val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, + "--config", cleanupKey + "=" + cleanupVal, + "--topic", topic)) + TopicCommand.alterTopic(zkClient, alterOpts) + val newProps = AdminUtils.fetchTopicConfig(zkClient, topic) + assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) + assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) + } +} \ No newline at end of file