Browse Source

KAFKA-1476 Added a ConsumerCommand tool that will replace other consumer related tools in the future; reviewed by Neha Narkhede

pull/1442/head
Onur Karaman 10 years ago committed by Neha Narkhede
parent
commit
9fe9913e95
  1. 56
      core/src/main/scala/kafka/admin/AdminUtils.scala
  2. 310
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  3. 26
      core/src/main/scala/kafka/utils/ZkUtils.scala
  4. 212
      core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
  5. 52
      core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
  6. 33
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

56
core/src/main/scala/kafka/admin/AdminUtils.scala

@ -20,7 +20,7 @@ package kafka.admin @@ -20,7 +20,7 @@ package kafka.admin
import kafka.common._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.utils.{Logging, ZkUtils, Json}
import kafka.utils._
import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
@ -164,6 +164,60 @@ object AdminUtils extends Logging { @@ -164,6 +164,60 @@ object AdminUtils extends Logging {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
}
def isConsumerGroupActive(zkClient: ZkClient, group: String) = {
ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty
}
/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkClient Zookeeper client
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = {
if (!isConsumerGroupActive(zkClient, group)) {
val dir = new ZKGroupDirs(group)
ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir)
true
}
else false
}
/**
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkClient Zookeeper client
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
*/
def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = {
val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
if (topics == Seq(topic)) {
deleteConsumerGroupInZK(zkClient, group)
}
else if (!isConsumerGroupActive(zkClient, group)) {
val dir = new ZKGroupTopicDirs(group, topic)
ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir)
ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir)
true
}
else false
}
/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkClient Zookeeper client
* @param topic Topic of the consumer group information we wish to delete
*/
def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) {
val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
}
def topicExists(zkClient: ZkClient, topic: String): Boolean =
zkClient.exists(ZkUtils.getTopicPath(topic))

310
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -0,0 +1,310 @@ @@ -0,0 +1,310 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import kafka.common._
import java.util.Properties
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import kafka.common.TopicAndPartition
import joptsimple.{OptionSpec, OptionParser}
import scala.collection.{Set, mutable}
import kafka.consumer.SimpleConsumer
import collection.JavaConversions._
object ConsumerGroupCommand {
def main(args: Array[String]) {
val opts = new ConsumerGroupCommandOptions(args)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")
// should have exactly one action
val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")
opts.checkArgs()
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
try {
if (opts.options.has(opts.listOpt))
list(zkClient)
else if (opts.options.has(opts.describeOpt))
describe(zkClient, opts)
else if (opts.options.has(opts.deleteOpt))
delete(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing consumer group command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
zkClient.close()
}
}
def list(zkClient: ZkClient) {
ZkUtils.getConsumerGroups(zkClient).foreach(println)
}
def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
val configs = parseConfigs(opts)
val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
val group = opts.options.valueOf(opts.groupOpt)
val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
if (topics.isEmpty) {
println("No topic available for consumer group provided")
}
topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
}
def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
deleteForTopic(zkClient, opts)
}
else if (opts.options.has(opts.groupOpt)) {
deleteForGroup(zkClient, opts)
}
else if (opts.options.has(opts.topicOpt)) {
deleteAllForTopic(zkClient, opts)
}
}
private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
val groups = opts.options.valuesOf(opts.groupOpt)
groups.foreach { group =>
try {
if (AdminUtils.deleteConsumerGroupInZK(zkClient, group))
println("Deleted all consumer group information for group %s in zookeeper.".format(group))
else
println("Delete for group %s failed because its consumers are still active.".format(group))
}
catch {
case e: ZkNoNodeException =>
println("Delete for group %s failed because group does not exist.".format(group))
}
}
}
private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
val groups = opts.options.valuesOf(opts.groupOpt)
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
groups.foreach { group =>
try {
if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
else
println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
}
catch {
case e: ZkNoNodeException =>
println("Delete for group %s topic %s failed because group does not exist.".format(group, topic))
}
}
}
private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
}
private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
"Invalid config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
props
}
private def describeTopic(zkClient: ZkClient,
group: String,
topic: String,
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int) {
val topicPartitions = getTopicPartitions(zkClient, topic)
val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
println("%s, %s, %s, %s, %s, %s, %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
topicPartitions
.sortBy { case topicPartition => topicPartition.partition }
.foreach { topicPartition =>
describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
}
}
private def getTopicPartitions(zkClient: ZkClient, topic: String) = {
val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
partitions.map(TopicAndPartition(topic, _))
}
private def getPartitionOffsets(zkClient: ZkClient,
group: String,
topicPartitions: Seq[TopicAndPartition],
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
val offsetMap = mutable.Map[TopicAndPartition, Long]()
val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
try {
val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case z: ZkNoNodeException =>
println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper."
.format(group, topicAndPartition))
}
}
else if (offsetAndMetadata.error == ErrorMapping.NoError)
offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
else
println("Could not fetch offset from kafka for group %s partition %s due to %s."
.format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
}
channel.disconnect()
offsetMap.toMap
}
private def describePartition(zkClient: ZkClient,
group: String,
topic: String,
partition: Int,
offsetOpt: Option[Long]) {
val topicAndPartition = TopicAndPartition(topic, partition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1
ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
case Some(-1) =>
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
case Some(brokerId) =>
val consumerOpt = getConsumer(zkClient, brokerId)
consumerOpt match {
case Some(consumer) =>
val request =
OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
consumer.close()
val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _)
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none")))
case None => // ignore
}
case None =>
println("No broker for partition %s".format(topicAndPartition))
}
}
private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = {
try {
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
} catch {
case t: Throwable =>
println("Could not parse broker info due to " + t.getMessage)
None
}
}
class ConsumerGroupCommandOptions(args: Array[String]) {
val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over."
val GroupDoc = "The consumer group we wish to act on."
val TopicDoc = "The topic whose consumer group information should be deleted."
val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
val ListDoc = "List all consumer groups."
val DescribeDoc = "Describe consumer group and list offset lag related to given group."
val nl = System.getProperty("line.separator")
val DeleteDoc = "Pass in groups to delete topic partition offsets and ownership information " +
"over the entire consumer group. For instance --group g1 --group g2" + nl +
"Pass in groups with a single topic to just delete the given topic's partition offsets and ownership " +
"information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl +
"Pass in just a topic to delete the given topic's partition offsets and ownership information " +
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Only does deletions on consumer groups that are not active."
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val groupOpt = parser.accepts("group", GroupDoc)
.withRequiredArg
.describedAs("consumer group")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", TopicDoc)
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val configOpt = parser.accepts("config", ConfigDoc)
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val options = parser.parse(args : _*)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
def checkArgs() {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
if (options.has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
}
}
}

26
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -749,6 +749,26 @@ object ZkUtils extends Logging { @@ -749,6 +749,26 @@ object ZkUtils extends Logging {
}.flatten.toSet
}
}
def getConsumerGroups(zkClient: ZkClient) = {
ZkUtils.getChildren(zkClient, ConsumersPath)
}
def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = {
ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
}
def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = {
val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath)
if (groups == null) Set.empty
else {
groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) =>
val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupOffsetsDir)
if (topics.contains(topic)) consumerGroupsForTopic + group
else consumerGroupsForTopic
}
}
}
}
object ZKStringSerializer extends ZkSerializer {
@ -769,11 +789,13 @@ class ZKGroupDirs(val group: String) { @@ -769,11 +789,13 @@ class ZKGroupDirs(val group: String) {
def consumerDir = ZkUtils.ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids"
def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
def consumerGroupOwnersDir = consumerGroupDir + "/owners"
}
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}

212
core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala

@ -0,0 +1,212 @@ @@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils}
import kafka.server.KafkaConfig
import org.junit.Test
import kafka.consumer._
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import kafka.integration.KafkaServerTestHarness
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_))
@Test
def testGroupWideDeleteInZK() {
val topic = "test"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
TestUtils.createTopic(zkClient, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInZK should delete the provided consumer group's directory")
TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
"DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
}
@Test
def testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup() {
val topic = "test"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
TestUtils.createTopic(zkClient, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete)
TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active")
TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)),
"DeleteConsumerGroupInZK should not delete unrelated consumer group directories")
}
@Test
def testGroupTopicWideDeleteInZKForGroupConsumingOneTopic() {
val topic = "test"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
TestUtils.createTopic(zkClient, topic, 1, 3, servers)
fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topic)
TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic")
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topic)),
"DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
}
@Test
def testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics() {
val topicToDelete = "topicToDelete"
val otherTopic = "otherTopic"
val groupToDelete = "groupToDelete"
val otherGroup = "otherGroup"
TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
fillInConsumerGroupInfo(topicToDelete, otherGroup, "consumer", 0, 10, false)
AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, groupToDelete, topicToDelete)
TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, topicToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic")
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(groupToDelete, otherTopic)),
"DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(otherGroup, topicToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should not delete unrelated consumer group owner and offset directories")
}
@Test
def testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics() {
val topicToDelete = "topicToDelete"
val otherTopic = "otherTopic"
val group = "group"
TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete)
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)),
"DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active")
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)),
"DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics")
}
@Test
def testTopicWideDeleteInZK() {
val topicToDelete = "topicToDelete"
val otherTopic = "otherTopic"
val groups = Seq("group1", "group2")
TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false))
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete)
TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
}
@Test
def testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK() {
val topic = "topic"
val group = "group"
TestUtils.createTopic(zkClient, topic, 1, 3, servers)
val dir = new ZKGroupTopicDirs(group, topic)
fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
AdminUtils.deleteTopic(zkClient, topic)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
TestUtils.waitUntilTrue(() => !groupDirExists(dir),
"Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
//produce events
val producer = TestUtils.createNewProducer(brokerList)
produceEvents(producer, topic, List.fill(10)("test"))
//consume events
val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer")
consumerProps.put("auto.commit.enable", "false")
consumerProps.put("auto.offset.reset", "smallest")
consumerProps.put("consumer.timeout.ms", "2000")
consumerProps.put("fetch.wait.max.ms", "0")
val consumerConfig = new ConsumerConfig(consumerProps)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
consumeEvents(messageStream, 5)
consumerConnector.commitOffsets(false)
producer.close()
consumerConnector.shutdown()
TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(dir),
"Consumer group info should exist after consuming from a recreated topic")
}
private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, registerConsumer: Boolean) {
val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId)
val consumerConfig = new ConsumerConfig(consumerProps)
val dir = new ZKGroupTopicDirs(group, topic)
TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset)
ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "")
ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir)
if (registerConsumer) {
ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "")
}
}
private def groupDirExists(dir: ZKGroupDirs) = {
ZkUtils.pathExists(zkClient, dir.consumerGroupDir)
}
private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = {
ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir)
}
private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) {
messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes)))
}
private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) {
val iter = messageStream.iterator
(0 until n).foreach(_ => iter.next)
}
}

52
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala

@ -16,23 +16,15 @@ @@ -16,23 +16,15 @@
*/
package kafka.admin
import java.io.File
import kafka.log.Log
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils}
import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig}
import kafka.server.{KafkaServer, KafkaConfig}
import org.junit.Test
import kafka.common._
import kafka.producer.{ProducerConfig, Producer}
import java.util.Properties
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.producer.KeyedMessage
import kafka.common.TopicAndPartition
import kafka.api.PartitionOffsetRequestInfo
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
@ -43,7 +35,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -43,7 +35,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkClient, topic)
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@ -68,7 +60,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -68,7 +60,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
"Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
// restart follower replica
follower.startup()
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@ -95,7 +87,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -95,7 +87,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
controller.startup()
follower.startup()
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
servers.foreach(_.shutdown())
}
@ -141,7 +133,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -141,7 +133,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
follower.startup()
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
allServers.foreach(_.shutdown())
}
@ -160,7 +152,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -160,7 +152,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
AdminUtils.deleteTopic(zkClient, topic)
follower.startup()
// test if topic deletion is resumed
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
@ -178,7 +170,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -178,7 +170,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// add partitions to topic
val newPartition = TopicAndPartition(topic, 1)
AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
assertTrue("Replica logs not deleted after delete topic is complete",
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
@ -193,7 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -193,7 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val servers = createTestTopicAndCluster(topic)
// start topic deletion
AdminUtils.deleteTopic(zkClient, topic)
verifyTopicDeletion(topic, servers)
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
// re-create topic on same replicas
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// wait until leader is elected
@ -213,7 +205,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -213,7 +205,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// start topic deletion
AdminUtils.deleteTopic(zkClient, "test2")
// verify delete topic path for test2 is removed from zookeeper
verifyTopicDeletion("test2", servers)
TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
// verify that topic test is untouched
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
res && server.getLogManager().getLog(topicAndPartition).isDefined),
@ -252,7 +244,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -252,7 +244,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// delete topic
AdminUtils.deleteTopic(zkClient, "test")
verifyTopicDeletion("test", servers)
TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
servers.foreach(_.shutdown())
}
@ -279,30 +271,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -279,30 +271,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
servers
}
private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) {
val topicAndPartition = TopicAndPartition(topic, 0)
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
"Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
"Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
// ensure that topic is removed from all cleaner offsets
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res &&
{
val topicAndPartition = TopicAndPartition(topic,0)
val logdir = server.getLogManager().logDirs(0)
val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read()
!checkpoints.contains(topicAndPartition)
}),
"Cleaner offset for deleted partition should have been removed")
}
private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
var counter = 0
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {

33
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -140,9 +140,10 @@ object TestUtils extends Logging { @@ -140,9 +140,10 @@ object TestUtils extends Logging {
* Create a test config for the given node id
*/
def createBrokerConfigs(numConfigs: Int,
enableControlledShutdown: Boolean = true): List[Properties] = {
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): List[Properties] = {
for((port, node) <- choosePorts(numConfigs).zipWithIndex)
yield createBrokerConfig(node, port, enableControlledShutdown)
yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic)
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
@ -153,7 +154,8 @@ object TestUtils extends Logging { @@ -153,7 +154,8 @@ object TestUtils extends Logging {
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
enableControlledShutdown: Boolean = true): Properties = {
enableControlledShutdown: Boolean = true,
enableDeleteTopic: Boolean = false): Properties = {
val props = new Properties
if (nodeId >= 0) props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
@ -162,6 +164,7 @@ object TestUtils extends Logging { @@ -162,6 +164,7 @@ object TestUtils extends Logging {
props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
props.put("delete.topic.enable", enableDeleteTopic.toString)
props
}
@ -793,6 +796,30 @@ object TestUtils extends Logging { @@ -793,6 +796,30 @@ object TestUtils extends Logging {
}
messages.reverse
}
def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
"Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
"Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() =>
servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
// ensure that topic is removed from all cleaner offsets
TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), "Cleaner offset for deleted partition should have been removed")
}
}
object TestZKUtils {

Loading…
Cancel
Save