From 8d4dbe60f188c48a7f0d552b7b1109fb8e126521 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Wed, 9 Oct 2013 20:44:49 -0700 Subject: [PATCH] kafka-1052; integrate add-partitions command into topicCommand; patched by Sriram Subramanian; reviewed by Jun Rao --- .../kafka/admin/AddPartitionsCommand.scala | 127 ------------------ .../main/scala/kafka/admin/AdminUtils.scala | 56 ++++++++ .../main/scala/kafka/admin/TopicCommand.scala | 23 ++-- .../unit/kafka/admin/AddPartitionsTest.scala | 10 +- 4 files changed, 76 insertions(+), 140 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/AddPartitionsCommand.scala diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala deleted file mode 100644 index c74d9c22456..00000000000 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import joptsimple.OptionParser -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import scala.collection.mutable -import kafka.common.TopicAndPartition - -object AddPartitionsCommand extends Logging { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic") - .withRequiredArg - .describedAs("# of partitions") - .ofType(classOf[java.lang.Integer]) - val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions") - .withRequiredArg - .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + - "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") - .ofType(classOf[String]) - .defaultsTo("") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { - if(!options.has(arg)) { - System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" + - "Missing required argument. " + " \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val topic = options.valueOf(topicOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val nPartitions = options.valueOf(nPartitionsOpt).intValue - val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) - var zkClient: ZkClient = null - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) - println("adding partitions succeeded!") - } catch { - case e: Throwable => - println("adding partitions failed because of " + e.getMessage) - println(Utils.stackTrace(e)) - } finally { - if (zkClient != null) - zkClient.close() - } - } - - def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { - val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - if (existingPartitionsReplicaList.size == 0) - throw new AdminOperationException("The topic %s does not exist".format(topic)) - - val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get - - // create the new partition replication list - val brokerList = ZkUtils.getSortedBrokerList(zkClient) - val newPartitionReplicaList = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) - else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) - - // check if manual assignment has the right replication factor - val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) - if (unmatchedRepFactorList.size != 0) - throw new AdminOperationException("The replication factor in manual replication assignment " + - " is not equal to the existing replication factor for the topic " + existingReplicaList.size) - - info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) - val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) - // add the new list - partitionReplicaList ++= newPartitionReplicaList - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) - } - - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { - val partitionList = replicaAssignmentList.split(",") - val ret = new mutable.HashMap[Int, List[Int]]() - var partitionId = startPartitionId - for (i <- 0 until partitionList.size) { - val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size <= 0) - throw new AdminOperationException("replication factor must be larger than 0") - if (brokerList.size != brokerList.toSet.size) - throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) - if (!brokerList.toSet.subsetOf(availableBrokerList)) - throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + - "available broker:" + availableBrokerList.toString) - ret.put(partitionId, brokerList.toList) - if (ret(partitionId).size != ret(startPartitionId).size) - throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList) - partitionId = partitionId + 1 - } - ret.toMap - } -} diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 6560fc668e8..8107a64cf1e 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -29,6 +29,10 @@ import scala.collection._ import mutable.ListBuffer import scala.collection.mutable import kafka.common._ +import scala.Predef._ +import collection.Map +import scala.Some +import collection.Set object AdminUtils extends Logging { val rand = new Random @@ -82,6 +86,58 @@ object AdminUtils extends Logging { } ret.toMap } + + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { + val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + if (existingPartitionsReplicaList.size == 0) + throw new AdminOperationException("The topic %s does not exist".format(topic)) + + val existingReplicaList = existingPartitionsReplicaList.head._2 + val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size + if (partitionsToAdd <= 0) + throw new AdminOperationException("The number of partitions for a topic can only be increased") + + // create the new partition replication list + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") + AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) + else + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) + + // check if manual assignment has the right replication factor + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) + if (unmatchedRepFactorList.size != 0) + throw new AdminOperationException("The replication factor in manual replication assignment " + + " is not equal to the existing replication factor for the topic " + existingReplicaList.size) + + info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) + val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) + // add the new list + partitionReplicaList ++= newPartitionReplicaList + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) + } + + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { + var partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + var partitionId = startPartitionId + partitionList = partitionList.takeRight(partitionList.size - partitionId) + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size <= 0) + throw new AdminOperationException("replication factor must be larger than 0") + if (brokerList.size != brokerList.toSet.size) + throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) + if (!brokerList.toSet.subsetOf(availableBrokerList)) + throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + + "available broker:" + availableBrokerList.toString) + ret.put(partitionId, brokerList.toList) + if (ret(partitionId).size != ret(startPartitionId).size) + throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList) + partitionId = partitionId + 1 + } + ret.toMap + } def deleteTopic(zkClient: ZkClient, topic: String) { zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 06bbd37a5af..56f3177e28a 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -78,16 +78,22 @@ object TopicCommand { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topics = opts.options.valuesOf(opts.topicOpt) - val configs = parseTopicConfigs(opts) - if(opts.options.has(opts.partitionsOpt)) - Utils.croak("Changing the number of partitions is not supported.") - if(opts.options.has(opts.replicationFactorOpt)) - Utils.croak("Changing the replication factor is not supported.") - for(topic <- topics) { + val topic = opts.options.valueOf(opts.topicOpt) + if(opts.options.has(opts.configOpt)) { + val configs = parseTopicConfigs(opts) AdminUtils.changeTopicConfig(zkClient, topic, configs) println("Updated config for topic \"%s\".".format(topic)) } + if(opts.options.has(opts.partitionsOpt)) { + println("WARNING: If partitions are increased for a topic that has a key, the partition " + + "logic or ordering of the messages will be affected") + val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + println("adding partitions succeeded!") + } + if(opts.options.has(opts.replicationFactorOpt)) + Utils.croak("Changing the replication factor is not supported.") } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -182,7 +188,8 @@ object TopicCommand { .withRequiredArg .describedAs("name=value") .ofType(classOf[String]) - val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.") + val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") .withRequiredArg .describedAs("# of partitions") .ofType(classOf[java.lang.Integer]) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 09254ccac2a..115e20305a1 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { def testTopicDoesNotExist { try { - AddPartitionsCommand.addPartitions(zkClient, "Blah", 1) + AdminUtils.addPartitions(zkClient, "Blah", 1) fail("Topic should not exist") } catch { case e: AdminOperationException => //this is good @@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { def testWrongReplicaCount { try { - AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2") + AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2") fail("Add partitions should fail") } catch { case e: AdminOperationException => //this is good @@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } def testIncrementPartitions { - AddPartitionsCommand.addPartitions(zkClient, topic1, 2) + AdminUtils.addPartitions(zkClient, topic1, 3) // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500) @@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } def testManualAssignmentOfReplicas { - AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3") + AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500) @@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } def testReplicaPlacement { - AddPartitionsCommand.addPartitions(zkClient, topic3, 6) + AdminUtils.addPartitions(zkClient, topic3, 7) // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)