From bf237fa7c576bd141d78fdea9f17f65ea269c290 Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 20 Jul 2018 23:52:04 +0800 Subject: [PATCH] KAFKA-7141; Consumer group describe should include groups with no committed offsets (#5356) Currently, if a consumer group never commits offsets, ConsumerGroupCommand will not include it in the describe output even if the member assignment is valid. Instead, the tool should be able to describe the group information showing empty current_offset and LAG. Reviewers: Sriharsha Chintalapani , Vahid Hashemian , Jason Gustafson --- .../kafka/admin/ConsumerGroupCommand.scala | 21 ++++++-------- .../admin/ConsumerGroupCommandTest.scala | 17 ++++++----- .../admin/DescribeConsumerGroupTest.scala | 28 ++++++++++++++++++- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 48c2cffb5d3..1d61720bfa9 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -27,7 +27,6 @@ import kafka.utils._ import org.apache.kafka.clients.{CommonClientConfigs, admin} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{KafkaException, Node, TopicPartition} @@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{Seq, Set} -import scala.concurrent.ExecutionException import scala.util.{Failure, Success, Try} object ConsumerGroupCommand extends Logging { @@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging { val state = consumerGroup.state val committedOffsets = getCommittedOffsets(groupId).asScala.toMap var assignedTopicPartitions = ListBuffer[TopicPartition]() - val rowsWithConsumer = if (committedOffsets.isEmpty) List[PartitionAssignmentState]() else consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq - .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size) - .flatMap { consumerSummary => - val topicPartitions = consumerSummary.assignment.topicPartitions.asScala - assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions - val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala - .map { topicPartition => - topicPartition -> committedOffsets.get(topicPartition).map(_.offset) - }.toMap + val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq + .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary => + val topicPartitions = consumerSummary.assignment.topicPartitions.asScala + assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions + val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala + .map { topicPartition => + topicPartition -> committedOffsets.get(topicPartition).map(_.offset) + }.toMap collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList, partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), Some(s"${consumerSummary.clientId}")) - } + } val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap { case (topicPartition, offset) => diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index 072f29abada..d5eea981635 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { def addConsumerGroupExecutor(numConsumers: Int, topic: String = topic, group: String = group, - strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = { - val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy) + strategy: String = classOf[RangeAssignor].getName, + customPropsOpt: Option[Properties] = None): ConsumerGroupExecutor = { + val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy, customPropsOpt) addExecutor(executor) executor } @@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { object ConsumerGroupCommandTest { - abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable { + abstract class AbstractConsumerRunnable(broker: String, groupId: String, customPropsOpt: Option[Properties] = None) extends Runnable { val props = new Properties configure(props) + customPropsOpt.foreach(props.asScala ++= _.asScala) val consumer = new KafkaConsumer(props) def configure(props: Properties): Unit = { @@ -145,8 +147,8 @@ object ConsumerGroupCommandTest { } } - class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String) - extends AbstractConsumerRunnable(broker, groupId) { + class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String, customPropsOpt: Option[Properties] = None) + extends AbstractConsumerRunnable(broker, groupId, customPropsOpt) { override def configure(props: Properties): Unit = { super.configure(props) @@ -182,11 +184,12 @@ object ConsumerGroupCommandTest { } } - class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String) + class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String, + customPropsOpt: Option[Properties] = None) extends AbstractConsumerGroupExecutor(numConsumers) { for (_ <- 1 to numConsumers) { - submit(new ConsumerRunnable(broker, groupId, topic, strategy)) + submit(new ConsumerRunnable(broker, groupId, topic, strategy, customPropsOpt)) } } diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index dce4cf928a6..88f9f4a6862 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -16,9 +16,11 @@ */ package kafka.admin +import java.util.Properties + import joptsimple.OptionException import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.RoundRobinAssignor +import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{TimeoutException} import org.junit.Assert._ @@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { fail("Expected an error due to presence of unrecognized --new-consumer option") } + @Test + def testDescribeNonOffsetCommitGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + val customProps = new Properties + // create a consumer group that never commits offsets + customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + // run one consumer in the group consuming from a single-partition topic + addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps)) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + val (state, assignments) = service.collectGroupOffsets() + state.contains("Stable") && + assignments.isDefined && + assignments.get.count(_.group == group) == 1 && + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) && + assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) + }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.") + } + }