Browse Source

MINOR: more log4j entry on elect / resignation of coordinators (#9416)

When a coordinator module is being elected / resigned, our log entry is usually associated with a background scheduler on loading / unloading entries and hence it is unclear at the exact time when the election or resignation happens, and we have to then compare with the KafkaAPI's log entry for leaderAndISR / StopReplica to infer the actual time. I think add a couple new log entries indicating the exact time when it happens is helpful.

Reviewers: Boyang Chen <boyang@confluent.io>, Lee Dongjin <dongjin@apache.org>, Bruno Cadonna <bruno@confluent.io>
pull/9447/head
Guozhang Wang 4 years ago committed by GitHub
parent
commit
236daf294d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  2. 2
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  3. 2
      core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

2
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -908,6 +908,7 @@ class GroupCoordinator(val brokerId: Int, @@ -908,6 +908,7 @@ class GroupCoordinator(val brokerId: Int,
* @param offsetTopicPartitionId The partition we are now leading
*/
def onElection(offsetTopicPartitionId: Int): Unit = {
info(s"Elected as the group coordinator for partition $offsetTopicPartitionId")
groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
}
@ -917,6 +918,7 @@ class GroupCoordinator(val brokerId: Int, @@ -917,6 +918,7 @@ class GroupCoordinator(val brokerId: Int,
* @param offsetTopicPartitionId The partition we are no longer leading
*/
def onResignation(offsetTopicPartitionId: Int): Unit = {
info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId")
groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
}

2
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -544,6 +544,7 @@ class GroupMetadataManager(brokerId: Int, @@ -544,6 +544,7 @@ class GroupMetadataManager(brokerId: Int,
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
try {
val schedulerTimeMs = time.milliseconds() - startTimeMs
debug(s"Started loading offsets and group metadata from $topicPartition")
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
val endTimeMs = time.milliseconds()
val totalLoadingTimeMs = endTimeMs - startTimeMs
@ -759,6 +760,7 @@ class GroupMetadataManager(brokerId: Int, @@ -759,6 +760,7 @@ class GroupMetadataManager(brokerId: Int,
var numOffsetsRemoved = 0
var numGroupsRemoved = 0
debug(s"Started unloading offsets and group metadata for $topicPartition")
inLock(partitionLock) {
// we need to guard the group removal in cache in the loading partition lock
// to prevent coordinator's check-and-get-group race condition

2
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

@ -311,6 +311,7 @@ class TransactionCoordinator(brokerId: Int, @@ -311,6 +311,7 @@ class TransactionCoordinator(brokerId: Int,
* @param coordinatorEpoch The partition coordinator (or leader) epoch from the received LeaderAndIsr request
*/
def onElection(txnTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
info(s"Elected as the txn coordinator for partition $txnTopicPartitionId at epoch $coordinatorEpoch")
// The operations performed during immigration must be resilient to any previous errors we saw or partial state we
// left off during the unloading phase. Ensure we remove all associated state for this partition before we continue
// loading it.
@ -329,6 +330,7 @@ class TransactionCoordinator(brokerId: Int, @@ -329,6 +330,7 @@ class TransactionCoordinator(brokerId: Int,
* are resigning after receiving a StopReplica request from the controller
*/
def onResignation(txnTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
info(s"Resigned as the txn coordinator for partition $txnTopicPartitionId at epoch $coordinatorEpoch")
coordinatorEpoch match {
case Some(epoch) =>
txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId, epoch)

Loading…
Cancel
Save