diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 28b12c7b89a..b700110f2d7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,7 +20,7 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.utils.{Logging, ZkUtils, Json} +import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random @@ -164,6 +164,60 @@ object AdminUtils extends Logging { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) } + def isConsumerGroupActive(zkClient: ZkClient, group: String) = { + ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty + } + + /** + * Delete the whole directory of the given consumer group if the group is inactive. + * + * @param zkClient Zookeeper client + * @param group Consumer group + * @return whether or not we deleted the consumer group information + */ + def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = { + if (!isConsumerGroupActive(zkClient, group)) { + val dir = new ZKGroupDirs(group) + ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir) + true + } + else false + } + + /** + * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive. + * If the consumer group consumes no other topics, delete the whole consumer group directory. + * + * @param zkClient Zookeeper client + * @param group Consumer group + * @param topic Topic of the consumer group information we wish to delete + * @return whether or not we deleted the consumer group information for the given topic + */ + def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = { + val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) + if (topics == Seq(topic)) { + deleteConsumerGroupInZK(zkClient, group) + } + else if (!isConsumerGroupActive(zkClient, group)) { + val dir = new ZKGroupTopicDirs(group, topic) + ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir) + ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir) + true + } + else false + } + + /** + * Delete every inactive consumer group's information about the given topic in Zookeeper. + * + * @param zkClient Zookeeper client + * @param topic Topic of the consumer group information we wish to delete + */ + def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) { + val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic) + groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic)) + } + def topicExists(zkClient: ZkClient, topic: String): Boolean = zkClient.exists(ZkUtils.getTopicPath(topic)) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala new file mode 100644 index 00000000000..89fa29a882a --- /dev/null +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + + +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import kafka.common._ +import java.util.Properties +import kafka.client.ClientUtils +import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} +import org.I0Itec.zkclient.exception.ZkNoNodeException +import kafka.common.TopicAndPartition +import joptsimple.{OptionSpec, OptionParser} +import scala.collection.{Set, mutable} +import kafka.consumer.SimpleConsumer +import collection.JavaConversions._ + + +object ConsumerGroupCommand { + + def main(args: Array[String]) { + val opts = new ConsumerGroupCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") + + // should have exactly one action + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") + + opts.checkArgs() + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + try { + if (opts.options.has(opts.listOpt)) + list(zkClient) + else if (opts.options.has(opts.describeOpt)) + describe(zkClient, opts) + else if (opts.options.has(opts.deleteOpt)) + delete(zkClient, opts) + } catch { + case e: Throwable => + println("Error while executing consumer group command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } + + def list(zkClient: ZkClient) { + ZkUtils.getConsumerGroups(zkClient).foreach(println) + } + + def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val configs = parseConfigs(opts) + val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt + val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + val group = opts.options.valueOf(opts.groupOpt) + val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) + if (topics.isEmpty) { + println("No topic available for consumer group provided") + } + topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + } + + def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) { + deleteForTopic(zkClient, opts) + } + else if (opts.options.has(opts.groupOpt)) { + deleteForGroup(zkClient, opts) + } + else if (opts.options.has(opts.topicOpt)) { + deleteAllForTopic(zkClient, opts) + } + } + + private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val groups = opts.options.valuesOf(opts.groupOpt) + groups.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInZK(zkClient, group)) + println("Deleted all consumer group information for group %s in zookeeper.".format(group)) + else + println("Delete for group %s failed because its consumers are still active.".format(group)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s failed because group does not exist.".format(group)) + } + } + } + + private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val groups = opts.options.valuesOf(opts.groupOpt) + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + groups.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic)) + println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic)) + else + println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s topic %s failed because group does not exist.".format(group, topic)) + } + } + } + + private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic)) + } + + private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } + + private def describeTopic(zkClient: ZkClient, + group: String, + topic: String, + channelSocketTimeoutMs: Int, + channelRetryBackoffMs: Int) { + val topicPartitions = getTopicPartitions(zkClient, topic) + val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) + println("%s, %s, %s, %s, %s, %s, %s" + .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + topicPartitions + .sortBy { case topicPartition => topicPartition.partition } + .foreach { topicPartition => + describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition)) + } + } + + private def getTopicPartitions(zkClient: ZkClient, topic: String) = { + val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic)) + val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) + partitions.map(TopicAndPartition(topic, _)) + } + + private def getPartitionOffsets(zkClient: ZkClient, + group: String, + topicPartitions: Seq[TopicAndPartition], + channelSocketTimeoutMs: Int, + channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = { + val offsetMap = mutable.Map[TopicAndPartition, Long]() + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." + .format(group, topicAndPartition)) + } + } + else if (offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else + println("Could not fetch offset from kafka for group %s partition %s due to %s." + .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + channel.disconnect() + offsetMap.toMap + } + + private def describePartition(zkClient: ZkClient, + group: String, + topic: String, + partition: Int, + offsetOpt: Option[Long]) { + val topicAndPartition = TopicAndPartition(topic, partition) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1 + ZkUtils.getLeaderForPartition(zkClient, topic, partition) match { + case Some(-1) => + println("%s, %s, %s, %s, %s, %s, %s" + .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) + case Some(brokerId) => + val consumerOpt = getConsumer(zkClient, brokerId) + consumerOpt match { + case Some(consumer) => + val request = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + consumer.close() + + val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) + println("%s, %s, %s, %s, %s, %s, %s" + .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + case None => // ignore + } + case None => + println("No broker for partition %s".format(topicAndPartition)) + } + } + + private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = { + try { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getMessage) + None + } + } + + class ConsumerGroupCommandOptions(args: Array[String]) { + val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over." + val GroupDoc = "The consumer group we wish to act on." + val TopicDoc = "The topic whose consumer group information should be deleted." + val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600" + val ListDoc = "List all consumer groups." + val DescribeDoc = "Describe consumer group and list offset lag related to given group." + val nl = System.getProperty("line.separator") + val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " + + "over the entire consumer group. For instance --group g1 --group g2" + nl + + "Pass in groups with a single topic to just delete the given topic's partition offsets and ownership " + + "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl + + "Pass in just a topic to delete the given topic's partition offsets and ownership information " + + "for every consumer group. For instance --topic t1" + nl + + "WARNING: Only does deletions on consumer groups that are not active." + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val groupOpt = parser.accepts("group", GroupDoc) + .withRequiredArg + .describedAs("consumer group") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", TopicDoc) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val configOpt = parser.accepts("config", ConfigDoc) + .withRequiredArg + .describedAs("name=value") + .ofType(classOf[String]) + val listOpt = parser.accepts("list", ListDoc) + val describeOpt = parser.accepts("describe", DescribeDoc) + val deleteOpt = parser.accepts("delete", DeleteDoc) + val options = parser.parse(args : _*) + + val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (options.has(describeOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) + if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) + CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt) + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt) + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c14bd455b66..c78a1b6ff42 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -749,6 +749,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getConsumerGroups(zkClient: ZkClient) = { + ZkUtils.getChildren(zkClient, ConsumersPath) + } + + def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = { + ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir) + } + + def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = { + val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath) + if (groups == null) Set.empty + else { + groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) => + val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupOffsetsDir) + if (topics.contains(topic)) consumerGroupsForTopic + group + else consumerGroupsForTopic + } + } + } } object ZKStringSerializer extends ZkSerializer { @@ -769,11 +789,13 @@ class ZKGroupDirs(val group: String) { def consumerDir = ZkUtils.ConsumersPath def consumerGroupDir = consumerDir + "/" + group def consumerRegistryDir = consumerGroupDir + "/ids" + def consumerGroupOffsetsDir = consumerGroupDir + "/offsets" + def consumerGroupOwnersDir = consumerGroupDir + "/owners" } class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) { - def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic - def consumerOwnerDir = consumerGroupDir + "/owners/" + topic + def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic + def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala new file mode 100644 index 00000000000..d530338728b --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils} +import kafka.server.KafkaConfig +import org.junit.Test +import kafka.consumer._ +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import kafka.integration.KafkaServerTestHarness + + +class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness { + val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_)) + + @Test + def testGroupWideDeleteInZK() { + val topic = "test" + val groupToDelete = "groupToDelete" + val otherGroup = "otherGroup" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false) + fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + + TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)), + "DeleteConsumerGroupInZK should delete the provided consumer group's directory") + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)), + "DeleteConsumerGroupInZK should not delete unrelated consumer group directories") + } + + @Test + def testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup() { + val topic = "test" + val groupToDelete = "groupToDelete" + val otherGroup = "otherGroup" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true) + fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)), + "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active") + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)), + "DeleteConsumerGroupInZK should not delete unrelated consumer group directories") + } + + @Test + def testGroupTopicWideDeleteInZKForGroupConsumingOneTopic() { + val topic = "test" + val groupToDelete = "groupToDelete" + val otherGroup = "otherGroup" + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false) + fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic) + + TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topic)), + "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories") + } + + @Test + def testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val groupToDelete = "groupToDelete" + val otherGroup = "otherGroup" + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + + fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false) + fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false) + fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete) + + TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, otherTopic)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topicToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories") + } + + @Test + def testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val group = "group" + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + + fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true) + fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete) + + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics") + } + + @Test + def testTopicWideDeleteInZK() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete)) + val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic)) + groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false)) + groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false)) + + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete) + + TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist), + "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist), + "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + } + + @Test + def testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK() { + val topic = "topic" + val group = "group" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val dir = new ZKGroupTopicDirs(group, topic) + fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false) + + AdminUtils.deleteTopic(zkClient, topic) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + + TestUtils.waitUntilTrue(() => !groupDirExists(dir), + "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + //produce events + val producer = TestUtils.createNewProducer(brokerList) + produceEvents(producer, topic, List.fill(10)("test")) + + //consume events + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer") + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "smallest") + consumerProps.put("consumer.timeout.ms", "2000") + consumerProps.put("fetch.wait.max.ms", "0") + val consumerConfig = new ConsumerConfig(consumerProps) + val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + consumeEvents(messageStream, 5) + consumerConnector.commitOffsets(false) + producer.close() + consumerConnector.shutdown() + + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(dir), + "Consumer group info should exist after consuming from a recreated topic") + } + + private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, registerConsumer: Boolean) { + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId) + val consumerConfig = new ConsumerConfig(consumerProps) + val dir = new ZKGroupTopicDirs(group, topic) + TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset) + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "") + ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir) + if (registerConsumer) { + ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "") + } + } + + private def groupDirExists(dir: ZKGroupDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerGroupDir) + } + + private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir) + } + + private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) { + messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes))) + } + + private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) { + val iter = messageStream.iterator + (0 until n).foreach(_ => iter.next) + } +} diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 33c27678bf8..0cbd72684a0 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -16,23 +16,15 @@ */ package kafka.admin -import java.io.File - import kafka.log.Log import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, TestUtils} -import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig} +import kafka.server.{KafkaServer, KafkaConfig} import org.junit.Test -import kafka.common._ -import kafka.producer.{ProducerConfig, Producer} import java.util.Properties -import kafka.api._ -import kafka.consumer.SimpleConsumer -import kafka.producer.KeyedMessage import kafka.common.TopicAndPartition -import kafka.api.PartitionOffsetRequestInfo class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -43,7 +35,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -68,7 +60,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") // restart follower replica follower.startup() - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -95,7 +87,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { controller.startup() follower.startup() - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) servers.foreach(_.shutdown()) } @@ -141,7 +133,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0) assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) follower.startup() - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) allServers.foreach(_.shutdown()) } @@ -160,7 +152,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, topic) follower.startup() // test if topic deletion is resumed - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), @@ -178,7 +170,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // add partitions to topic val newPartition = TopicAndPartition(topic, 1) AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2") - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) @@ -193,7 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val servers = createTestTopicAndCluster(topic) // start topic deletion AdminUtils.deleteTopic(zkClient, topic) - verifyTopicDeletion(topic, servers) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // re-create topic on same replicas AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until leader is elected @@ -213,7 +205,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // start topic deletion AdminUtils.deleteTopic(zkClient, "test2") // verify delete topic path for test2 is removed from zookeeper - verifyTopicDeletion("test2", servers) + TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers) // verify that topic test is untouched TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), @@ -252,7 +244,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // delete topic AdminUtils.deleteTopic(zkClient, "test") - verifyTopicDeletion("test", servers) + TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) servers.foreach(_.shutdown()) } @@ -279,30 +271,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { servers } - private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { - val topicAndPartition = TopicAndPartition(topic, 0) - // wait until admin path for delete topic is deleted, signaling completion of topic deletion - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), - "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") - TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), - "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") - // ensure that the topic-partition has been deleted from all brokers' replica managers - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None), - "Replica manager's should have deleted all of this topic's partitions") - // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper - assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) - // ensure that topic is removed from all cleaner offsets - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res && - { - val topicAndPartition = TopicAndPartition(topic,0) - val logdir = server.getLogManager().logDirs(0) - val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() - !checkpoints.contains(topicAndPartition) - }), - "Cleaner offset for deleted partition should have been removed") - } - private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { var counter = 0 for(dup <- 0 until numDups; key <- 0 until numKeys) yield { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 54755e8dd3f..21d0ed2cb7c 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -140,9 +140,10 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfigs(numConfigs: Int, - enableControlledShutdown: Boolean = true): List[Properties] = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown) + yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -153,7 +154,8 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, port: Int = choosePort(), - enableControlledShutdown: Boolean = true): Properties = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Properties = { val props = new Properties if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -162,6 +164,7 @@ object TestUtils extends Logging { props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) + props.put("delete.topic.enable", enableDeleteTopic.toString) props } @@ -793,6 +796,30 @@ object TestUtils extends Logging { } messages.reverse } + + def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { + val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => + servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)), + "Replica manager's should have deleted all of this topic's partitions") + // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + assertTrue("Replica logs not deleted after delete topic is complete", + servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty))) + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall { tp => + val checkpoints = server.getLogManager().logDirs.map { logDir => + new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + } + checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) + }), "Cleaner offset for deleted partition should have been removed") + } + } object TestZKUtils {