From 72282ed1988e33000f91672a72c7356e26658241 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Fri, 1 Nov 2019 14:18:48 +0530 Subject: [PATCH] MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric We would mistakenly increment the `OffsetCommits` metric instead Author: Stanislav Kozlovski Reviewers: David Jacot , Manikumar Reddy Closes #7624 from stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric --- .../kafka/coordinator/group/GroupCoordinator.scala | 14 +++++++------- .../coordinator/group/GroupMetadataManager.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 22f15f9811f..24a17807d9d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int, def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = { var groupError: Errors = Errors.NONE var partitionErrors: Map[TopicPartition, Errors] = Map() - var partitionEligibleForDeletion: Seq[TopicPartition] = Seq() + var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq() validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match { case Some(error) => @@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int, Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR case Empty => - partitionEligibleForDeletion = partitions + partitionsEligibleForDeletion = partitions case PreparingRebalance | CompletingRebalance | Stable if group.isConsumerGroup => val (consumed, notConsumed) = partitions.partition(tp => group.isSubscribedToTopic(tp.topic())) - partitionEligibleForDeletion = notConsumed + partitionsEligibleForDeletion = notConsumed partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap case _ => @@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int, } } - if (partitionEligibleForDeletion.nonEmpty) { + if (partitionsEligibleForDeletion.nonEmpty) { val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => { - group.removeOffsets(partitionEligibleForDeletion) + group.removeOffsets(partitionsEligibleForDeletion) }) - partitionErrors ++= partitionEligibleForDeletion.map(_ -> Errors.NONE).toMap + partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap offsetDeletionSensor.record(offsetsRemoved) - info(s"The following offsets of the group $groupId were deleted: ${partitionEligibleForDeletion.mkString(", ")}. " + + info(s"The following offsets of the group $groupId were deleted: ${partitionsEligibleForDeletion.mkString(", ")}. " + s"A total of $offsetsRemoved offsets were removed.") } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 2cbf7c8af29..a1436693c7e 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int, val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => { group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs) }) - offsetCommitsSensor.record(numOffsetsRemoved) + offsetExpiredSensor.record(numOffsetsRemoved) info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.") }