Browse Source

KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead

This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.

Author: Ishita Mandhan <imandha@us.ibm.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1429 from imandhan/KAFKA-3158
pull/1449/head
Ishita Mandhan 9 years ago committed by Ismael Juma
parent
commit
0aff450961
  1. 10
      core/src/main/scala/kafka/admin/AdminClient.scala
  2. 33
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  3. 2
      core/src/test/scala/integration/kafka/api/AdminClientTest.scala

10
core/src/main/scala/kafka/admin/AdminClient.scala

@ -143,21 +143,21 @@ class AdminClient(val time: Time,
clientHost: String, clientHost: String,
assignment: List[TopicPartition]) assignment: List[TopicPartition])
def describeConsumerGroup(groupId: String): List[ConsumerSummary] = { def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = {
val group = describeGroup(groupId) val group = describeGroup(groupId)
if (group.state == "Dead") if (group.state == "Dead")
return List.empty[ConsumerSummary] return None
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
if (group.state == "Stable") { if (group.state == "Stable") {
group.members.map { member => Some(group.members.map { member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
} })
} else { } else {
List.empty Some(List.empty)
} }
} }

33
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -312,22 +312,25 @@ object ConsumerGroupCommand {
} }
protected def describeGroup(group: String) { protected def describeGroup(group: String) {
val consumerSummaries = adminClient.describeConsumerGroup(group) adminClient.describeConsumerGroup(group) match {
if (consumerSummaries.isEmpty) case None => println(s"Consumer group `${group}` does not exist.")
println(s"Consumer group `${group}` does not exist or is rebalancing.") case Some(consumerSummaries) =>
else { if (consumerSummaries.isEmpty)
val consumer = getConsumer() println(s"Consumer group `${group}` is rebalancing.")
printDescribeHeader() else {
consumerSummaries.foreach { consumerSummary => val consumer = getConsumer()
val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) printDescribeHeader()
val partitionOffsets = topicPartitions.flatMap { topicPartition => consumerSummaries.foreach { consumerSummary =>
Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
topicPartition -> offsetAndMetadata.offset val partitionOffsets = topicPartitions.flatMap { topicPartition =>
Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
topicPartition -> offsetAndMetadata.offset
}
}.toMap
describeTopicPartition(group, topicPartitions, partitionOffsets.get,
_ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
} }
}.toMap }
describeTopicPartition(group, topicPartitions, partitionOffsets.get,
_ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
}
} }
} }

2
core/src/test/scala/integration/kafka/api/AdminClientTest.scala

@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
val consumerSummaries = client.describeConsumerGroup(groupId) val consumerSummaries = client.describeConsumerGroup(groupId)
assertEquals(1, consumerSummaries.size) assertEquals(1, consumerSummaries.size)
assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet) assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
} }
@Test @Test

Loading…
Cancel
Save