Browse Source

KAFKA-7142: fix joinGroup performance issues (#5354)

Summary:
1. Revert GroupMetadata.members to private
2. Add back a wrongly removed comment
3. In GroupMetadata.remove(), update supportedProtocols and awaitingJoinCallbackMembers, only when the remove succeeded

Reviewers: Jason Gustafson <jason@confluent.io>,  Ismael Juma <ismael@juma.me.uk>, Sriharsha Chintalapani <sriharsha@apache.org>
pull/5468/head
ying-zheng 6 years ago committed by Harsha
parent
commit
b01f8fb668
  1. 17
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  2. 53
      core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
  3. 9
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
  4. 3
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala

17
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int, @@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int,
case Empty | Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
if (member.awaitingJoinCallback != null) {
member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
member.awaitingJoinCallback = null
}
group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR))
}
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
case Stable | CompletingRebalance =>
@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int, @@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int,
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.awaitingJoinCallback = callback
// update the newMemberAdded flag to indicate that the join group can be further delayed
if (group.is(PreparingRebalance) && group.generationId == 0)
group.newMemberAdded = true
group.add(member)
group.add(member, callback)
maybePrepareRebalance(group, s"Adding new member $memberId")
member
}
@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int, @@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int,
member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) {
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
group.updateMember(member, protocols, callback)
maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")
}
@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int, @@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int,
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
if (group.notYetRejoinedMembers.isEmpty)
if (group.hasAllMembersJoined)
forceComplete()
else false
}
@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int, @@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int,
leaderId = group.leaderOrNull,
error = Errors.NONE)
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
group.invokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}

53
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

@ -128,7 +128,7 @@ private object GroupMetadata { @@ -128,7 +128,7 @@ private object GroupMetadata {
group.protocol = Option(protocol)
group.leaderId = Option(leaderId)
group.currentStateTimestamp = currentStateTimestamp
members.foreach(group.add)
members.foreach(group.add(_, null))
group
}
}
@ -172,6 +172,8 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs @@ -172,6 +172,8 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
*/
@nonthreadsafe
private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging {
type JoinCallback = JoinGroupResult => Unit
private[group] val lock = new ReentrantLock
private var state: GroupState = initialState
@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def protocolOrNull: String = protocol.orNull
def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
def add(member: MemberMetadata) {
def add(member: MemberMetadata, callback: JoinCallback = null) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
members.put(member.memberId, member)
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
member.awaitingJoinCallback = callback
if (member.awaitingJoinCallback != null)
numMembersAwaitingJoin += 1;
}
def remove(memberId: String) {
members.remove(memberId)
members.remove(memberId).foreach { member =>
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
if (member.awaitingJoinCallback != null)
numMembersAwaitingJoin -= 1
}
if (isLeader(memberId)) {
leaderId = if (members.isEmpty) {
None
@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
def hasAllMembersJoined = members.size <= numMembersAwaitingJoin
def allMembers = members.keySet
def allMemberMetadata = members.values.toList
@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private def candidateProtocols = {
// get the set of protocols that are commonly supported by all members
allMemberMetadata
.map(_.protocols)
.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
val numMembers = members.size
supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
}
def supportsProtocols(memberProtocols: Set[String]) = {
members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
val numMembers = members.size
members.isEmpty || memberProtocols.exists(supportedProtocols(_) == numMembers)
}
def updateMember(member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) = {
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
member.supportedProtocols = protocols
if (callback != null && member.awaitingJoinCallback == null) {
numMembersAwaitingJoin += 1;
} else if (callback == null && member.awaitingJoinCallback != null) {
numMembersAwaitingJoin -= 1;
}
member.awaitingJoinCallback = callback
}
def invokeJoinCallback(member: MemberMetadata,
joinGroupResult: JoinGroupResult) : Unit = {
if (member.awaitingJoinCallback != null) {
member.awaitingJoinCallback(joinGroupResult)
member.awaitingJoinCallback = null
numMembersAwaitingJoin -= 1;
}
}
def initNextGeneration() = {

9
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

@ -843,8 +843,7 @@ class GroupMetadataManagerTest { @@ -843,8 +843,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
member.awaitingJoinCallback = _ => ()
group.add(member)
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@ -873,8 +872,7 @@ class GroupMetadataManagerTest { @@ -873,8 +872,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
member.awaitingJoinCallback = _ => ()
group.add(member)
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest { @@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest {
val subscription = new Subscription(List(topic).asJava)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
member.awaitingJoinCallback = _ => ()
group.add(member)
group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()

3
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala

@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite { @@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite {
protocolType, List(("roundrobin", Array.empty[Byte])))
group.transitionTo(PreparingRebalance)
member.awaitingJoinCallback = _ => ()
group.add(member)
group.add(member, _ => ())
assertEquals(0, group.generationId)
assertNull(group.protocolOrNull)

Loading…
Cancel
Save