From b01f8fb668988caa19feb63e878fe1901a9d0c89 Mon Sep 17 00:00:00 2001 From: ying-zheng Date: Mon, 6 Aug 2018 13:20:40 -0700 Subject: [PATCH] KAFKA-7142: fix joinGroup performance issues (#5354) Summary: 1. Revert GroupMetadata.members to private 2. Add back a wrongly removed comment 3. In GroupMetadata.remove(), update supportedProtocols and awaitingJoinCallbackMembers, only when the remove succeeded Reviewers: Jason Gustafson , Ismael Juma , Sriharsha Chintalapani --- .../coordinator/group/GroupCoordinator.scala | 17 +++--- .../coordinator/group/GroupMetadata.scala | 53 ++++++++++++++++--- .../group/GroupMetadataManagerTest.scala | 9 ++-- .../coordinator/group/GroupMetadataTest.scala | 3 +- 4 files changed, 56 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 6ca443f66eb..c4e6dc97137 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int, case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { - if (member.awaitingJoinCallback != null) { - member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR)) - member.awaitingJoinCallback = null - } + group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) } + joinPurgatory.checkAndComplete(GroupKey(group.groupId)) case Stable | CompletingRebalance => @@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int, val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) - member.awaitingJoinCallback = callback // update the newMemberAdded flag to indicate that the join group can be further delayed if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true - group.add(member) + group.add(member, callback) maybePrepareRebalance(group, s"Adding new member $memberId") member } @@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int, member: MemberMetadata, protocols: List[(String, Array[Byte])], callback: JoinCallback) { - member.supportedProtocols = protocols - member.awaitingJoinCallback = callback + group.updateMember(member, protocols, callback) maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}") } @@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int, def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { group.inLock { - if (group.notYetRejoinedMembers.isEmpty) + if (group.hasAllMembersJoined) forceComplete() else false } @@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int, leaderId = group.leaderOrNull, error = Errors.NONE) - member.awaitingJoinCallback(joinResult) - member.awaitingJoinCallback = null + group.invokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index d729449af4e..cbe78e980b6 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -128,7 +128,7 @@ private object GroupMetadata { group.protocol = Option(protocol) group.leaderId = Option(leaderId) group.currentStateTimestamp = currentStateTimestamp - members.foreach(group.add) + members.foreach(group.add(_, null)) group } } @@ -172,6 +172,8 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs */ @nonthreadsafe private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging { + type JoinCallback = JoinGroupResult => Unit + private[group] val lock = new ReentrantLock private var state: GroupState = initialState @@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private var protocol: Option[String] = None private val members = new mutable.HashMap[String, MemberMetadata] + private var numMembersAwaitingJoin = 0 + private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0) private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() @@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def protocolOrNull: String = protocol.orNull def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1) - def add(member: MemberMetadata) { + def add(member: MemberMetadata, callback: JoinCallback = null) { if (members.isEmpty) this.protocolType = Some(member.protocolType) @@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (leaderId.isEmpty) leaderId = Some(member.memberId) members.put(member.memberId, member) + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } + member.awaitingJoinCallback = callback + if (member.awaitingJoinCallback != null) + numMembersAwaitingJoin += 1; } def remove(memberId: String) { - members.remove(memberId) + members.remove(memberId).foreach { member => + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 } + if (member.awaitingJoinCallback != null) + numMembersAwaitingJoin -= 1 + } + if (isLeader(memberId)) { leaderId = if (members.isEmpty) { None @@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList + def hasAllMembersJoined = members.size <= numMembersAwaitingJoin + def allMembers = members.keySet def allMemberMetadata = members.values.toList @@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState private def candidateProtocols = { // get the set of protocols that are commonly supported by all members - allMemberMetadata - .map(_.protocols) - .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols) + val numMembers = members.size + supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet } def supportsProtocols(memberProtocols: Set[String]) = { - members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty + val numMembers = members.size + members.isEmpty || memberProtocols.exists(supportedProtocols(_) == numMembers) + } + + def updateMember(member: MemberMetadata, + protocols: List[(String, Array[Byte])], + callback: JoinCallback) = { + member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 } + protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 } + member.supportedProtocols = protocols + + if (callback != null && member.awaitingJoinCallback == null) { + numMembersAwaitingJoin += 1; + } else if (callback == null && member.awaitingJoinCallback != null) { + numMembersAwaitingJoin -= 1; + } + member.awaitingJoinCallback = callback + } + + def invokeJoinCallback(member: MemberMetadata, + joinGroupResult: JoinGroupResult) : Unit = { + if (member.awaitingJoinCallback != null) { + member.awaitingJoinCallback(joinGroupResult) + member.awaitingJoinCallback = null + numMembersAwaitingJoin -= 1; + } } def initNextGeneration() = { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 77e6fdc3683..21c13658e79 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -843,8 +843,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -873,8 +872,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest { val subscription = new Subscription(List(topic).asJava) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array()))) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) group.transitionTo(PreparingRebalance) group.initNextGeneration() diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 90545339ca5..ac12804b1d2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite { protocolType, List(("roundrobin", Array.empty[Byte]))) group.transitionTo(PreparingRebalance) - member.awaitingJoinCallback = _ => () - group.add(member) + group.add(member, _ => ()) assertEquals(0, group.generationId) assertNull(group.protocolOrNull)