From 71ed6ca3368ff38909f502565a4bf0f39e70fc6c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 7 Oct 2013 09:22:12 -0700 Subject: [PATCH] kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang, Swapnil Ghike and Neha Narkhede --- bin/kafka-check-reassignment-status.sh | 17 --- .../kafka/admin/CheckReassignmentStatus.scala | 110 ------------------ .../admin/ReassignPartitionsCommand.scala | 74 ++++++++++-- 3 files changed, 63 insertions(+), 138 deletions(-) delete mode 100755 bin/kafka-check-reassignment-status.sh delete mode 100644 core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala diff --git a/bin/kafka-check-reassignment-status.sh b/bin/kafka-check-reassignment-status.sh deleted file mode 100755 index 1f218585cdd..00000000000 --- a/bin/kafka-check-reassignment-status.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@ diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala deleted file mode 100644 index 7e85f87e96d..00000000000 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient -import kafka.utils._ -import scala.collection.Map -import kafka.common.TopicAndPartition - -object CheckReassignmentStatus extends Logging { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to") - .withRequiredArg - .describedAs("partition reassignment json file path") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val options = parser.parse(args : _*) - - for(arg <- List(jsonFileOpt, zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val jsonFile = options.valueOf(jsonFileOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileAsString(jsonFile) - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - - try { - // read the json file into a string - val partitionsToBeReassigned = Json.parseFull(jsonString) match { - case Some(reassignedPartitions) => - val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] - partitions.map { m => - val topic = m.asInstanceOf[Map[String, String]].get("topic").get - val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt - val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get - val newReplicas = replicasList.split(",").map(_.toInt) - (TopicAndPartition(topic, partition), newReplicas.toSeq) - }.toMap - case None => Map.empty[TopicAndPartition, Seq[Int]] - } - - val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) - reassignedPartitionsStatus.foreach { partition => - partition._2 match { - case ReassignmentCompleted => - println("Partition %s reassignment completed successfully".format(partition._1)) - case ReassignmentFailed => - println("Partition %s reassignment failed".format(partition._1)) - case ReassignmentInProgress => - println("Partition %s reassignment in progress".format(partition._1)) - } - } - } - } - - def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) - :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) - // for all partitions whose replica reassignment is complete, check the status - partitionsToBeReassigned.map { topicAndPartition => - (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, - topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) - } - } - - def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, - reassignedReplicas: Seq[Int], - partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], - partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { - val newReplicas = partitionsToBeReassigned(topicAndPartition) - partitionsBeingReassigned.get(topicAndPartition) match { - case Some(partition) => ReassignmentInProgress - case None => - // check if AR == RAR - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) - if(assignedReplicas == newReplicas) - ReassignmentCompleted - else - ReassignmentFailed - } - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index f333d29bf36..c6fc4ab7c21 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging { .describedAs("execute") .ofType(classOf[String]) + val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + val options = parser.parse(args : _*) for(arg <- List(zkConnectOpt)) { @@ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging { var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - if(options.has(topicsToMoveJsonFileOpt)) { + if(options.has(statusCheckJsonFileOpt)) { + val jsonFile = options.valueOf(statusCheckJsonFileOpt) + val jsonString = Utils.readFileAsString(jsonFile) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) + + println("Status of partition reassignment:") + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Reassignment of partition %s completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Reassignment of partition %s failed".format(partition._1)) + case ReassignmentInProgress => + println("Reassignment of partition %s is still in progress".format(partition._1)) + } + } + } else if(options.has(topicsToMoveJsonFileOpt)) { val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) val brokerList = options.valueOf(brokerListOpt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) @@ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging { System.exit(1) } - if (options.has(executeOpt)) { - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) - } else { - System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + - "The replica assignment is \n" + partitionsToBeReassigned.toString()) + if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { + if (options.has(executeOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + + "The following is the replica assignment. Save it for the status check option.\n" + + ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) + } } } catch { case e: Throwable => @@ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging { zkClient.close() } } + + private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) + :Map[TopicAndPartition, ReassignmentStatus] = { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + partitionsToBeReassigned.map { topicAndPartition => + (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + } + } + + private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, + reassignedReplicas: Seq[Int], + partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], + partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { + val newReplicas = partitionsToBeReassigned(topicAndPartition) + partitionsBeingReassigned.get(topicAndPartition) match { + case Some(partition) => ReassignmentInProgress + case None => + // check if the current replica assignment matches the expected one after reassignment + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) + if(assignedReplicas == newReplicas) + ReassignmentCompleted + else + ReassignmentFailed + } + } } class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])