Browse Source

MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric

We would mistakenly increment the `OffsetCommits` metric instead

Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>

Reviewers: David Jacot <djacot@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #7624 from stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric
pull/7633/head
Stanislav Kozlovski 5 years ago committed by Manikumar Reddy
parent
commit
72282ed198
  1. 14
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  2. 2
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

14
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int, @@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = {
var groupError: Errors = Errors.NONE
var partitionErrors: Map[TopicPartition, Errors] = Map()
var partitionEligibleForDeletion: Seq[TopicPartition] = Seq()
var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match {
case Some(error) =>
@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int, @@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int,
Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
case Empty =>
partitionEligibleForDeletion = partitions
partitionsEligibleForDeletion = partitions
case PreparingRebalance | CompletingRebalance | Stable if group.isConsumerGroup =>
val (consumed, notConsumed) =
partitions.partition(tp => group.isSubscribedToTopic(tp.topic()))
partitionEligibleForDeletion = notConsumed
partitionsEligibleForDeletion = notConsumed
partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
case _ =>
@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int, @@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int,
}
}
if (partitionEligibleForDeletion.nonEmpty) {
if (partitionsEligibleForDeletion.nonEmpty) {
val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => {
group.removeOffsets(partitionEligibleForDeletion)
group.removeOffsets(partitionsEligibleForDeletion)
})
partitionErrors ++= partitionEligibleForDeletion.map(_ -> Errors.NONE).toMap
partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap
offsetDeletionSensor.record(offsetsRemoved)
info(s"The following offsets of the group $groupId were deleted: ${partitionEligibleForDeletion.mkString(", ")}. " +
info(s"The following offsets of the group $groupId were deleted: ${partitionsEligibleForDeletion.mkString(", ")}. " +
s"A total of $offsetsRemoved offsets were removed.")
}
}

2
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int, @@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int,
val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
offsetCommitsSensor.record(numOffsetsRemoved)
offsetExpiredSensor.record(numOffsetsRemoved)
info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
}

Loading…
Cancel
Save