Browse Source

KAFKA-7141; Consumer group describe should include groups with no committed offsets (#5356)

Currently, if a consumer group never commits offsets, ConsumerGroupCommand will not include it in the describe output even if the member assignment is valid. Instead, the tool should be able to describe the group information showing empty current_offset and LAG.

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>
pull/5406/head
huxi 6 years ago committed by Jason Gustafson
parent
commit
bf237fa7c5
  1. 21
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  2. 17
      core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
  3. 28
      core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala

21
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -27,7 +27,6 @@ import kafka.utils._ @@ -27,7 +27,6 @@ import kafka.utils._
import org.apache.kafka.clients.{CommonClientConfigs, admin}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition} @@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, Set}
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
object ConsumerGroupCommand extends Logging {
@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging { @@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging {
val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer = if (committedOffsets.isEmpty) List[PartitionAssignmentState]() else consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size)
.flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
.map { topicPartition =>
topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
}.toMap
val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment.topicPartitions.asScala
assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
.map { topicPartition =>
topicPartition -> committedOffsets.get(topicPartition).map(_.offset)
}.toMap
collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
}
val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>

17
core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala

@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { @@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
def addConsumerGroupExecutor(numConsumers: Int,
topic: String = topic,
group: String = group,
strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = {
val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy)
strategy: String = classOf[RangeAssignor].getName,
customPropsOpt: Option[Properties] = None): ConsumerGroupExecutor = {
val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy, customPropsOpt)
addExecutor(executor)
executor
}
@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { @@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
object ConsumerGroupCommandTest {
abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable {
abstract class AbstractConsumerRunnable(broker: String, groupId: String, customPropsOpt: Option[Properties] = None) extends Runnable {
val props = new Properties
configure(props)
customPropsOpt.foreach(props.asScala ++= _.asScala)
val consumer = new KafkaConsumer(props)
def configure(props: Properties): Unit = {
@ -145,8 +147,8 @@ object ConsumerGroupCommandTest { @@ -145,8 +147,8 @@ object ConsumerGroupCommandTest {
}
}
class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String)
extends AbstractConsumerRunnable(broker, groupId) {
class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String, customPropsOpt: Option[Properties] = None)
extends AbstractConsumerRunnable(broker, groupId, customPropsOpt) {
override def configure(props: Properties): Unit = {
super.configure(props)
@ -182,11 +184,12 @@ object ConsumerGroupCommandTest { @@ -182,11 +184,12 @@ object ConsumerGroupCommandTest {
}
}
class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String)
class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String,
customPropsOpt: Option[Properties] = None)
extends AbstractConsumerGroupExecutor(numConsumers) {
for (_ <- 1 to numConsumers) {
submit(new ConsumerRunnable(broker, groupId, topic, strategy))
submit(new ConsumerRunnable(broker, groupId, topic, strategy, customPropsOpt))
}
}

28
core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala

@ -16,9 +16,11 @@ @@ -16,9 +16,11 @@
*/
package kafka.admin
import java.util.Properties
import joptsimple.OptionException
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.RoundRobinAssignor
import org.apache.kafka.clients.consumer.{ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{TimeoutException}
import org.junit.Assert._
@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { @@ -605,5 +607,29 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
fail("Expected an error due to presence of unrecognized --new-consumer option")
}
@Test
def testDescribeNonOffsetCommitGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
val customProps = new Properties
// create a consumer group that never commits offsets
customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
// run one consumer in the group consuming from a single-partition topic
addConsumerGroupExecutor(numConsumers = 1, customPropsOpt = Some(customProps))
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
val (state, assignments) = service.collectGroupOffsets()
state.contains("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group $group.")
}
}

Loading…
Cancel
Save