Browse Source

KAFKA-4349; Handle 'PreparingRebalance' and 'AwaitingSync' states in consumer group describe

The edge case where consumer group state is `PreparingRebalance` or `AwaitingSync` will be separately handled as the group assignment is not yet determined.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2070 from vahidhashemian/KAFKA-4349
pull/2070/merge
Vahid Hashemian 8 years ago committed by Jason Gustafson
parent
commit
34e9cc5dfa
  1. 8
      core/src/main/scala/kafka/admin/AdminClient.scala
  2. 13
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

8
core/src/main/scala/kafka/admin/AdminClient.scala

@ -149,9 +149,15 @@ class AdminClient(val time: Time,
Errors.forCode(metadata.errorCode()).maybeThrow() Errors.forCode(metadata.errorCode()).maybeThrow()
val consumers = metadata.members.map { consumer => val consumers = metadata.members.map { consumer =>
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
case "Stable" =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList) assignment.partitions.toList
case _ =>
List()
})
}.toList }.toList
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
} }

13
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -73,20 +73,23 @@ object ConsumerGroupCommand extends Logging {
case None => case None =>
printError(s"The consumer group '$groupId' does not exist.") printError(s"The consumer group '$groupId' does not exist.")
case Some(assignments) => case Some(assignments) =>
if (assignments.isEmpty)
state match { state match {
case Some("Dead") => case Some("Dead") =>
printError(s"Consumer group '$groupId' does not exist.") printError(s"Consumer group '$groupId' does not exist.")
case Some("Empty") => case Some("Empty") =>
printError(s"Consumer group '$groupId' has no active members.") printError(s"Consumer group '$groupId' has no active members.")
case Some(_) => case Some("PreparingRebalance") | Some("AwaitingSync") =>
printError(s"Consumer group '$groupId' is rebalancing.") System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
printAssignment(assignments, !opts.useOldConsumer)
case Some("Stable") =>
printAssignment(assignments, !opts.useOldConsumer)
case Some(other) =>
// the control should never reach here
throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.")
case None => case None =>
// the control should never reach here // the control should never reach here
throw new KafkaException("Expected a valid consumer group state, but none found.") throw new KafkaException("Expected a valid consumer group state, but none found.")
} }
else
printAssignment(assignments, !opts.useOldConsumer)
} }
} }
else if (opts.options.has(opts.deleteOpt)) { else if (opts.options.has(opts.deleteOpt)) {

Loading…
Cancel
Save