From 0aff450961a8dd14cc7820ee8d1c9eea855439b0 Mon Sep 17 00:00:00 2001 From: Ishita Mandhan Date: Sat, 28 May 2016 23:30:10 +0100 Subject: [PATCH] KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message. Author: Ishita Mandhan Reviewers: Vahid Hashemian , Jason Gustafson , Ismael Juma Closes #1429 from imandhan/KAFKA-3158 --- .../main/scala/kafka/admin/AdminClient.scala | 10 +++--- .../kafka/admin/ConsumerGroupCommand.scala | 33 ++++++++++--------- .../kafka/api/AdminClientTest.scala | 2 +- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ebb5026cf8b..8572cebe3a2 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -143,21 +143,21 @@ class AdminClient(val time: Time, clientHost: String, assignment: List[TopicPartition]) - def describeConsumerGroup(groupId: String): List[ConsumerSummary] = { + def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = { val group = describeGroup(groupId) if (group.state == "Dead") - return List.empty[ConsumerSummary] + return None if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") if (group.state == "Stable") { - group.members.map { member => + Some(group.members.map { member => val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) - } + }) } else { - List.empty + Some(List.empty) } } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7baee44..b086d8f64ca 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -312,22 +312,25 @@ object ConsumerGroupCommand { } protected def describeGroup(group: String) { - val consumerSummaries = adminClient.describeConsumerGroup(group) - if (consumerSummaries.isEmpty) - println(s"Consumer group `${group}` does not exist or is rebalancing.") - else { - val consumer = getConsumer() - printDescribeHeader() - consumerSummaries.foreach { consumerSummary => - val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) - val partitionOffsets = topicPartitions.flatMap { topicPartition => - Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => - topicPartition -> offsetAndMetadata.offset + adminClient.describeConsumerGroup(group) match { + case None => println(s"Consumer group `${group}` does not exist.") + case Some(consumerSummaries) => + if (consumerSummaries.isEmpty) + println(s"Consumer group `${group}` is rebalancing.") + else { + val consumer = getConsumer() + printDescribeHeader() + consumerSummaries.foreach { consumerSummary => + val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) + val partitionOffsets = topicPartitions.flatMap { topicPartition => + Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => + topicPartition -> offsetAndMetadata.offset + } + }.toMap + describeTopicPartition(group, topicPartitions, partitionOffsets.get, + _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) } - }.toMap - describeTopicPartition(group, topicPartitions, partitionOffsets.get, - _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) - } + } } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 7fae81e8622..3d39475d3d0 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { val consumerSummaries = client.describeConsumerGroup(groupId) assertEquals(1, consumerSummaries.size) - assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet) + assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet)) } @Test