Browse Source

kafka-1052; integrate add-partitions command into topicCommand; patched by Sriram Subramanian; reviewed by Jun Rao

pull/15/head
Sriram Subramanian 11 years ago committed by Jun Rao
parent
commit
8d4dbe60f1
  1. 127
      core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
  2. 56
      core/src/main/scala/kafka/admin/AdminUtils.scala
  3. 23
      core/src/main/scala/kafka/admin/TopicCommand.scala
  4. 10
      core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala

127
core/src/main/scala/kafka/admin/AddPartitionsCommand.scala

@ -1,127 +0,0 @@ @@ -1,127 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import joptsimple.OptionParser
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection.mutable
import kafka.common.TopicAndPartition
object AddPartitionsCommand extends Logging {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
.ofType(classOf[String])
.defaultsTo("")
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
if(!options.has(arg)) {
System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" +
"Missing required argument. " + " \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val nPartitions = options.valueOf(nPartitionsOpt).intValue
val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
} catch {
case e: Throwable =>
println("adding partitions failed because of " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
if (zkClient != null)
zkClient.close()
}
}
def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
// create the new partition replication list
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val newPartitionReplicaList = if (replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
if (unmatchedRepFactorList.size != 0)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaList.size)
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
partitionId = partitionId + 1
}
ret.toMap
}
}

56
core/src/main/scala/kafka/admin/AdminUtils.scala

@ -29,6 +29,10 @@ import scala.collection._ @@ -29,6 +29,10 @@ import scala.collection._
import mutable.ListBuffer
import scala.collection.mutable
import kafka.common._
import scala.Predef._
import collection.Map
import scala.Some
import collection.Set
object AdminUtils extends Logging {
val rand = new Random
@ -82,6 +86,58 @@ object AdminUtils extends Logging { @@ -82,6 +86,58 @@ object AdminUtils extends Logging {
}
ret.toMap
}
def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaList = existingPartitionsReplicaList.head._2
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
if (unmatchedRepFactorList.size != 0)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaList.size)
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
var partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
partitionId = partitionId + 1
}
ret.toMap
}
def deleteTopic(zkClient: ZkClient, topic: String) {
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))

23
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -78,16 +78,22 @@ object TopicCommand { @@ -78,16 +78,22 @@ object TopicCommand {
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val configs = parseTopicConfigs(opts)
if(opts.options.has(opts.partitionsOpt))
Utils.croak("Changing the number of partitions is not supported.")
if(opts.options.has(opts.replicationFactorOpt))
Utils.croak("Changing the replication factor is not supported.")
for(topic <- topics) {
val topic = opts.options.valueOf(opts.topicOpt)
if(opts.options.has(opts.configOpt)) {
val configs = parseTopicConfigs(opts)
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
}
if(opts.options.has(opts.replicationFactorOpt))
Utils.croak("Changing the replication factor is not supported.")
}
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@ -182,7 +188,8 @@ object TopicCommand { @@ -182,7 +188,8 @@ object TopicCommand {
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created.")
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])

10
core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala

@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
def testTopicDoesNotExist {
try {
AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
AdminUtils.addPartitions(zkClient, "Blah", 1)
fail("Topic should not exist")
} catch {
case e: AdminOperationException => //this is good
@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
def testWrongReplicaCount {
try {
AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
} catch {
case e: AdminOperationException => //this is good
@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testIncrementPartitions {
AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
AdminUtils.addPartitions(zkClient, topic1, 3)
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testManualAssignmentOfReplicas {
AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
}
def testReplicaPlacement {
AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
AdminUtils.addPartitions(zkClient, topic3, 7)
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)

Loading…
Cancel
Save