Browse Source

KAFKA-7824; Require member.id for initial join group request [KIP-394] (#6058)

This patch implements KIP-394 as documented in https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request.

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jason Gustafson <jason@confluent.io>
pull/6155/head
Boyang Chen 6 years ago committed by Jason Gustafson
parent
commit
9a9310d074
  1. 32
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  2. 30
      clients/src/main/java/org/apache/kafka/common/errors/MemberIdRequiredException.java
  3. 5
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  4. 9
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
  5. 9
      clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
  6. 4
      clients/src/main/resources/common/message/JoinGroupRequest.json
  7. 4
      clients/src/main/resources/common/message/JoinGroupResponse.json
  8. 39
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
  9. 7
      core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
  10. 2
      core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
  11. 230
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  12. 20
      core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
  13. 6
      core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
  14. 4
      core/src/main/scala/kafka/server/KafkaApis.scala
  15. 2
      core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
  16. 2
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
  17. 68
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
  18. 15
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala

32
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.DisconnectException; @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@ -415,7 +416,8 @@ public abstract class AbstractCoordinator implements Closeable { @@ -415,7 +416,8 @@ public abstract class AbstractCoordinator implements Closeable {
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
@ -548,6 +550,16 @@ public abstract class AbstractCoordinator implements Closeable { @@ -548,6 +550,16 @@ public abstract class AbstractCoordinator implements Closeable {
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
joinResponse.memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(Errors.MEMBER_ID_REQUIRED);
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
@ -729,6 +741,16 @@ public abstract class AbstractCoordinator implements Closeable { @@ -729,6 +741,16 @@ public abstract class AbstractCoordinator implements Closeable {
return generation;
}
/**
* Check whether given generation id is matching the record within current generation.
* Only using in unit tests.
* @param generationId generation id
* @return true if the two ids are matching.
*/
final synchronized boolean hasMatchingGenerationId(int generationId) {
return generation != null && generation.generationId == generationId;
}
/**
* Reset the generation and memberId because we have fallen out of the group.
*/
@ -777,7 +799,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -777,7 +799,7 @@ public abstract class AbstractCoordinator implements Closeable {
* Leave the current group and reset local generation/memberId.
*/
public synchronized void maybeLeaveGroup() {
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.isValid()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Sending LeaveGroup request to coordinator {}", coordinator);
@ -1078,7 +1100,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1078,7 +1100,7 @@ public abstract class AbstractCoordinator implements Closeable {
protected static class Generation {
public static final Generation NO_GENERATION = new Generation(
OffsetCommitRequest.DEFAULT_GENERATION_ID,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
JoinGroupResponse.UNKNOWN_MEMBER_ID,
null);
public final int generationId;
@ -1091,6 +1113,10 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1091,6 +1113,10 @@ public abstract class AbstractCoordinator implements Closeable {
this.protocol = protocol;
}
public boolean isValid() {
return generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;

30
clients/src/main/java/org/apache/kafka/common/errors/MemberIdRequiredException.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class MemberIdRequiredException extends ApiException {
private static final long serialVersionUID = 1L;
public MemberIdRequiredException(String message) {
super(message);
}
public MemberIdRequiredException(String message, Throwable cause) {
super(message, cause);
}
}

5
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -59,6 +59,7 @@ import org.apache.kafka.common.errors.InvalidTxnTimeoutException; @@ -59,6 +59,7 @@ import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
@ -294,7 +295,9 @@ public enum Errors { @@ -294,7 +295,9 @@ public enum Errors {
StaleBrokerEpochException::new),
OFFSET_NOT_AVAILABLE(78, "The leader high watermark has not caught up from a recent leader " +
"election so the offsets cannot be guaranteed to be monotonically increasing",
OffsetNotAvailableException::new);
OffsetNotAvailableException::new),
MEMBER_ID_REQUIRED(79, "The group member needs to have a valid member id before actually entering a consumer group",
MemberIdRequiredException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

9
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java

@ -76,9 +76,15 @@ public class JoinGroupRequest extends AbstractRequest { @@ -76,9 +76,15 @@ public class JoinGroupRequest extends AbstractRequest {
*/
private static final Schema JOIN_GROUP_REQUEST_V3 = JOIN_GROUP_REQUEST_V2;
/**
* The version number is bumped to indicate that client needs to issue a second join group request under first try
* with UNKNOWN_MEMBER_ID.
*/
private static final Schema JOIN_GROUP_REQUEST_V4 = JOIN_GROUP_REQUEST_V3;
public static Schema[] schemaVersions() {
return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2,
JOIN_GROUP_REQUEST_V3};
JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}
public static final String UNKNOWN_MEMBER_ID = "";
@ -209,6 +215,7 @@ public class JoinGroupRequest extends AbstractRequest { @@ -209,6 +215,7 @@ public class JoinGroupRequest extends AbstractRequest {
Collections.emptyMap());
case 2:
case 3:
case 4:
return new JoinGroupResponse(
throttleTimeMs,
Errors.forException(e),

9
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java

@ -73,9 +73,15 @@ public class JoinGroupResponse extends AbstractResponse { @@ -73,9 +73,15 @@ public class JoinGroupResponse extends AbstractResponse {
*/
private static final Schema JOIN_GROUP_RESPONSE_V3 = JOIN_GROUP_RESPONSE_V2;
/**
* The version number is bumped to indicate that client needs to issue a second join group request under first try
* with UNKNOWN_MEMBER_ID.
*/
private static final Schema JOIN_GROUP_RESPONSE_V4 = JOIN_GROUP_RESPONSE_V3;
public static Schema[] schemaVersions() {
return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2,
JOIN_GROUP_RESPONSE_V3};
JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}
public static final String UNKNOWN_PROTOCOL = "";
@ -92,6 +98,7 @@ public class JoinGroupResponse extends AbstractResponse { @@ -92,6 +98,7 @@ public class JoinGroupResponse extends AbstractResponse {
* UNKNOWN_MEMBER_ID (25)
* INVALID_SESSION_TIMEOUT (26)
* GROUP_AUTHORIZATION_FAILED (30)
* MEMBER_ID_REQUIRED (79)
*/
private final int throttleTimeMs;

4
clients/src/main/resources/common/message/JoinGroupRequest.json

@ -19,7 +19,9 @@ @@ -19,7 +19,9 @@
"name": "JoinGroupRequest",
// Version 1 adds RebalanceTimeoutMs.
// Version 2 and 3 are the same as version 1.
"validVersions": "0-3",
// Starting from version 4, the client needs to issue a second request to join group
// with assigned id.
"validVersions": "0-4",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about": "The group identifier." },

4
clients/src/main/resources/common/message/JoinGroupResponse.json

@ -20,7 +20,9 @@ @@ -20,7 +20,9 @@
// Version 1 is the same as version 0.
// Version 2 adds throttle time.
// Starting in version 3, on quota violation, brokers send out responses before throttling.
"validVersions": "0-3",
// Starting in version 4, the client needs to issue a second request to join group
// with assigned id.
"validVersions": "0-4",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

39
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -176,6 +176,40 @@ public class AbstractCoordinatorTest { @@ -176,6 +176,40 @@ public class AbstractCoordinatorTest {
assertTrue(consumerClient.poll(future, mockTime.timer(0)));
}
@Test
public void testJoinGroupRequestWithMemberIdRequired() {
setupCoordinator();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));
final String memberId = "memberId";
final int generation = -1;
mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupResponse.UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
if (!(body instanceof JoinGroupRequest)) {
return false;
}
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
if (!joinGroupRequest.memberId().equals(memberId)) {
return false;
}
return true;
}
}, joinGroupResponse(Errors.UNKNOWN_MEMBER_ID));
RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
assertEquals(Errors.MEMBER_ID_REQUIRED.message(), future.exception().getMessage());
assertTrue(coordinator.rejoinNeededOrPending());
assertTrue(coordinator.hasMatchingGenerationId(generation));
future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future, mockTime.timer(REBALANCE_TIMEOUT_MS)));
}
@Test
public void testUncaughtExceptionInHeartbeatThread() throws Exception {
setupCoordinator();
@ -639,6 +673,11 @@ public class AbstractCoordinatorTest { @@ -639,6 +673,11 @@ public class AbstractCoordinatorTest {
Collections.<String, ByteBuffer>emptyMap());
}
private JoinGroupResponse joinGroupResponse(Errors error) {
return joinGroupFollowerResponse(JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_MEMBER_ID, JoinGroupResponse.UNKNOWN_MEMBER_ID, error);
}
private SyncGroupResponse syncGroupResponse(Errors error) {
return new SyncGroupResponse(error, ByteBuffer.allocate(0));
}

7
core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala

@ -25,12 +25,13 @@ import kafka.server.DelayedOperation @@ -25,12 +25,13 @@ import kafka.server.DelayedOperation
*/
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
group: GroupMetadata,
member: MemberMetadata,
memberId: String,
isPending: Boolean,
deadline: Long,
timeoutMs: Long)
extends DelayedOperation(timeoutMs, Some(group.lock)) {
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _)
override def onExpiration() = coordinator.onExpireHeartbeat(group, member, deadline)
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, memberId, isPending, deadline, forceComplete _)
override def onExpiration() = coordinator.onExpireHeartbeat(group, memberId, isPending, deadline)
override def onComplete() = coordinator.onCompleteHeartbeat()
}

2
core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala

@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator, @@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
override def tryComplete(): Boolean = false
override def onComplete(): Unit = {
group.inLock {
group.inLock {
if (group.newMemberAdded && remainingMs != 0) {
group.newMemberAdded = false
val delay = min(configuredRebalanceDelay, remainingMs)

230
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -101,6 +101,7 @@ class GroupCoordinator(val brokerId: Int, @@ -101,6 +101,7 @@ class GroupCoordinator(val brokerId: Int,
def handleJoinGroup(groupId: String,
memberId: String,
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
@ -117,24 +118,63 @@ class GroupCoordinator(val brokerId: Int, @@ -117,24 +118,63 @@ class GroupCoordinator(val brokerId: Int,
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
} else {
// only try to create the group if the group is not unknown AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request
groupManager.getGroup(groupId) match {
case None =>
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
// only try to create the group if the group is UNKNOWN AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request.
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
}
case Some(group) =>
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
doUnknownJoinGroup(group, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
}
}
}
private def doUnknownJoinGroup(group: GroupMetadata,
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id.
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
val newMemberId = clientId + "-" + group.generateMemberIdSuffix
if (requireKnownMemberId) {
// If member id required, register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(joinError(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, clientId, clientHost, protocolType,
protocols, group, responseCallback)
}
}
}
}
private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
@ -145,84 +185,74 @@ class GroupCoordinator(val brokerId: Int, @@ -145,84 +185,74 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group.inLock {
if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty)) {
//reject if first member with empty group protocol or protocolType is empty
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id.
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
} else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted.
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, clientId, clientHost, protocolType,
protocols, group, responseCallback)
} else if (!group.has(memberId)) {
// if the member trying to register with a un-recognized id, send the response to let
// it reset its member id and retry
// it reset its member id and retry.
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
group.currentState match {
case Dead =>
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id,
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType,
protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
case CompletingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType,
protocols, group, responseCallback)
val member = group.get(memberId)
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
// member has changed metadata, so force a rebalance
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
// member has changed metadata, so force a rebalance
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
case Empty | Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType,
protocols, group, responseCallback)
case Stable =>
val member = group.get(memberId)
if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
val member = group.get(memberId)
if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
case Empty | Dead =>
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member ${memberId} of group ${group.groupId} in " +
s"unexpected group state ${group.currentState}")
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
}
if (group.is(PreparingRebalance))
@ -320,7 +350,7 @@ class GroupCoordinator(val brokerId: Int, @@ -320,7 +350,7 @@ class GroupCoordinator(val brokerId: Int,
groupManager.getGroup(groupId) match {
case None =>
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the consumer to retry
// joining without specified consumer id,
responseCallback(Errors.UNKNOWN_MEMBER_ID)
@ -402,9 +432,9 @@ class GroupCoordinator(val brokerId: Int, @@ -402,9 +432,9 @@ class GroupCoordinator(val brokerId: Int,
group.currentState match {
case Dead =>
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id,
// joining without the specified member id.
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Empty =>
@ -664,7 +694,7 @@ class GroupCoordinator(val brokerId: Int, @@ -664,7 +694,7 @@ class GroupCoordinator(val brokerId: Int,
JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = 0,
generationId = GroupCoordinator.NoGeneration,
subProtocol = GroupCoordinator.NoProtocol,
leaderId = GroupCoordinator.NoLeader,
error = error)
@ -677,7 +707,7 @@ class GroupCoordinator(val brokerId: Int, @@ -677,7 +707,7 @@ class GroupCoordinator(val brokerId: Int,
completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
}
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long) {
// complete current heartbeat expectation
member.latestHeartbeat = time.milliseconds()
val memberKey = MemberKey(member.groupId, member.memberId)
@ -685,10 +715,20 @@ class GroupCoordinator(val brokerId: Int, @@ -685,10 +715,20 @@ class GroupCoordinator(val brokerId: Int,
// reschedule the next heartbeat expiration deadline
val deadline = member.latestHeartbeat + timeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, timeoutMs)
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
/**
* Add pending member expiration to heartbeat purgatory
*/
private def addPendingMemberExpiration(group: GroupMetadata, pendingMemberId: String, timeoutMs: Long) {
val pendingMemberKey = MemberKey(group.groupId, pendingMemberId)
val deadline = time.milliseconds() + timeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, pendingMemberId, isPending = true, deadline, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(pendingMemberKey))
}
private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
member.isLeaving = true
val memberKey = MemberKey(member.groupId, member.memberId)
@ -697,13 +737,13 @@ class GroupCoordinator(val brokerId: Int, @@ -697,13 +737,13 @@ class GroupCoordinator(val brokerId: Int,
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
memberId: String,
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): MemberMetadata = {
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
@ -724,6 +764,7 @@ class GroupCoordinator(val brokerId: Int, @@ -724,6 +764,7 @@ class GroupCoordinator(val brokerId: Int,
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
maybePrepareRebalance(group, s"Adding new member $memberId")
group.removePendingMember(memberId)
member
}
@ -844,19 +885,32 @@ class GroupCoordinator(val brokerId: Int, @@ -844,19 +885,32 @@ class GroupCoordinator(val brokerId: Int,
}
}
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group.inLock {
if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving)
forceComplete()
else false
if (isPending)
group.has(memberId)
else {
val member = group.get(memberId)
if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) {
forceComplete()
} else false
}
}
}
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long) {
group.inLock {
if (!member.shouldKeepAlive(heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
if (isPending) {
debug(s"Pending member $memberId has been removed after session timeout expiration.")
group.removePendingMember(memberId)
} else if (!group.has(memberId)) {
debug(s"Member $memberId has already been removed from the group.")
} else {
val member = group.get(memberId)
if (!member.shouldKeepAlive(heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}

20
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

@ -184,6 +184,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -184,6 +184,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
private val pendingMembers = new mutable.HashSet[String]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
@ -212,7 +213,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -212,7 +213,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
assert(groupId == member.groupId)
assert(this.protocolType.orNull == member.protocolType)
assert(supportsProtocols(member.protocols))
assert(supportsProtocols(member.protocolType, MemberMetadata.plainProtocolSet(member.supportedProtocols)))
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
@ -239,11 +240,17 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -239,11 +240,17 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}
}
def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) && !has(memberId)
def addPendingMember(memberId: String) = pendingMembers.add(memberId)
def removePendingMember(memberId: String) = pendingMembers.remove(memberId)
def currentState = state
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
def hasAllMembersJoined = members.size <= numMembersAwaitingJoin
def hasAllMembersJoined = members.size <= numMembersAwaitingJoin && pendingMembers.isEmpty
def allMembers = members.keySet
@ -253,7 +260,6 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -253,7 +260,6 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
timeout.max(member.rebalanceTimeoutMs)
}
// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString
def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
@ -287,9 +293,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -287,9 +293,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
}
def supportsProtocols(memberProtocols: Set[String]) = {
val numMembers = members.size
members.isEmpty || memberProtocols.exists(supportedProtocols(_) == numMembers)
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]) = {
if (is(Empty))
!memberProtocolType.isEmpty && memberProtocols.nonEmpty
else
protocolType.contains(memberProtocolType) && memberProtocols.exists(supportedProtocols(_) == members.size)
}
def updateMember(member: MemberMetadata,

6
core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala

@ -29,6 +29,10 @@ case class MemberSummary(memberId: String, @@ -29,6 +29,10 @@ case class MemberSummary(memberId: String,
metadata: Array[Byte],
assignment: Array[Byte])
private object MemberMetadata {
def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet
}
/**
* Member metadata contains the following metadata:
*
@ -66,8 +70,6 @@ private[group] class MemberMetadata(val memberId: String, @@ -66,8 +70,6 @@ private[group] class MemberMetadata(val memberId: String,
var isLeaving: Boolean = false
var isNew: Boolean = false
def protocols = supportedProtocols.map(_._1).toSet
/**
* Get metadata corresponding to the provided protocol.
*/

4
core/src/main/scala/kafka/server/KafkaApis.scala

@ -1244,12 +1244,16 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1244,12 +1244,16 @@ class KafkaApis(val requestChannel: RequestChannel,
Collections.emptyMap())
)
} else {
// Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
val requireKnownMemberId = joinGroupRequest.version >= 4
// let the coordinator handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
groupCoordinator.handleJoinGroup(
joinGroupRequest.groupId,
joinGroupRequest.memberId,
requireKnownMemberId,
request.header.clientId,
request.session.clientAddress.toString,
joinGroupRequest.rebalanceTimeout,

2
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala

@ -52,7 +52,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { @@ -52,7 +52,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)

2
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala

@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest @@ -159,7 +159,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
callback
}
override def runWithCallback(member: GroupMember, responseCallback: JoinGroupCallback): Unit = {
groupCoordinator.handleJoinGroup(member.groupId, member.memberId, "clientId", "clientHost",
groupCoordinator.handleJoinGroup(member.groupId, member.memberId, requireKnownMemberId = false, "clientId", "clientHost",
DefaultRebalanceTimeout, DefaultSessionTimeout,
protocolType, protocols, responseCallback)
}

68
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -126,7 +126,7 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -126,7 +126,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// JoinGroup
var joinGroupResponse: Option[JoinGroupResult] = None
groupCoordinator.handleJoinGroup(otherGroupId, memberId, "clientId", "clientHost", 60000, 10000, "consumer",
groupCoordinator.handleJoinGroup(otherGroupId, memberId, true, "clientId", "clientHost", 60000, 10000, "consumer",
List("range" -> new Array[Byte](0)), result => { joinGroupResponse = Some(result)})
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), joinGroupResponse.map(_.error))
@ -285,8 +285,7 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -285,8 +285,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.reset(replicaManager)
val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
rebalanceTimeout, sessionTimeout)
val responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout, rebalanceTimeout)
assertFalse(responseFuture.isCompleted)
assertEquals(2, group.allMembers.size)
@ -315,6 +314,7 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -315,6 +314,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.reset(replicaManager)
val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
timer.advanceClock(GroupInitialRebalanceDelay + 1)
val joinGroupResult = await(joinGroupFuture, 1)
assertEquals(Errors.NONE, joinGroupResult.error)
@ -335,8 +335,39 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -335,8 +335,39 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
def testHeartbeatWrongCoordinator() {
def testJoinGroupUnknownConsumerDeadGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val deadGroupId = "deadGroupId"
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
val joinGroupResult = joinGroup(deadGroupId, memberId, protocolType, protocols)
assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupResult.error)
}
@Test
def testJoinGroupSecondJoinInconsistentProtocol() {
var responseFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, requireKnownMemberId = true)
var joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS))
assertEquals(Errors.MEMBER_ID_REQUIRED, joinGroupResult.error)
val memberId = joinGroupResult.memberId
// Sending an inconsistent protocol shall be refused
EasyMock.reset(replicaManager)
responseFuture = sendJoinGroup(groupId, memberId, protocolType, List(), requireKnownMemberId = true)
joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS))
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
// Sending consistent protocol shall be accepted
EasyMock.reset(replicaManager)
responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, requireKnownMemberId = true)
timer.advanceClock(GroupInitialRebalanceDelay + 1)
joinGroupResult = Await.result(responseFuture, Duration(DefaultRebalanceTimeout + 1, TimeUnit.MILLISECONDS))
assertEquals(Errors.NONE, joinGroupResult.error)
}
@Test
def testHeartbeatWrongCoordinator() {
val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
}
@ -1545,10 +1576,10 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -1545,10 +1576,10 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance() {
val rebalanceTimeout = GroupInitialRebalanceDelay * 3
val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
EasyMock.reset(replicaManager)
timer.advanceClock(GroupInitialRebalanceDelay - 1)
val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
EasyMock.reset(replicaManager)
timer.advanceClock(2)
@ -1569,12 +1600,12 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -1569,12 +1600,12 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def shouldDelayRebalanceUptoRebalanceTimeout() {
val rebalanceTimeout = GroupInitialRebalanceDelay * 2
val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
EasyMock.reset(replicaManager)
val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
timer.advanceClock(GroupInitialRebalanceDelay + 1)
EasyMock.reset(replicaManager)
val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout = rebalanceTimeout)
timer.advanceClock(GroupInitialRebalanceDelay)
EasyMock.reset(replicaManager)
@ -1626,13 +1657,14 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -1626,13 +1657,14 @@ class GroupCoordinatorTest extends JUnitSuite {
memberId: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
sessionTimeout: Int = DefaultSessionTimeout,
rebalanceTimeout: Int = DefaultRebalanceTimeout,
sessionTimeout: Int = DefaultSessionTimeout): Future[JoinGroupResult] = {
requireKnownMemberId: Boolean = false): Future[JoinGroupResult] = {
val (responseFuture, responseCallback) = setupJoinGroupCallback
EasyMock.replay(replicaManager)
groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout,
groupCoordinator.handleJoinGroup(groupId, memberId, requireKnownMemberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout,
protocolType, protocols, responseCallback)
responseFuture
}
@ -1683,7 +1715,19 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -1683,7 +1715,19 @@ class GroupCoordinatorTest extends JUnitSuite {
protocols: List[(String, Array[Byte])],
sessionTimeout: Int = DefaultSessionTimeout,
rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, rebalanceTimeout, sessionTimeout)
val requireKnownMemberId = true
var responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
// Since member id is required, we need another bounce to get the successful join group result.
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID && requireKnownMemberId) {
val joinGroupResult = Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
// If some other error is triggered, return the error immediately for caller to handle.
if (joinGroupResult.error != Errors.MEMBER_ID_REQUIRED) {
return joinGroupResult
}
EasyMock.reset(replicaManager)
responseFuture = sendJoinGroup(groupId, joinGroupResult.memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
}
timer.advanceClock(GroupInitialRebalanceDelay + 1)
// should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))

15
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.coordinator.group
import kafka.admin.ConsumerGroupCommand.GroupState
import kafka.common.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
@ -238,16 +239,17 @@ class GroupMetadataTest extends JUnitSuite { @@ -238,16 +239,17 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSupportsProtocols() {
// by default, the group supports everything
assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "range")))
val memberId = "memberId"
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(member)
assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
assertTrue(group.supportsProtocols(Set("range", "foo")))
assertFalse(group.supportsProtocols(Set("foo", "bar")))
group.transitionTo(PreparingRebalance)
assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "foo")))
assertTrue(group.supportsProtocols(protocolType, Set("range", "foo")))
assertFalse(group.supportsProtocols(protocolType, Set("foo", "bar")))
val otherMemberId = "otherMemberId"
val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
@ -255,8 +257,9 @@ class GroupMetadataTest extends JUnitSuite { @@ -255,8 +257,9 @@ class GroupMetadataTest extends JUnitSuite {
group.add(otherMember)
assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
assertFalse(group.supportsProtocols(Set("range", "foo")))
assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "foo")))
assertFalse(group.supportsProtocols("invalid_type", Set("roundrobin", "foo")))
assertFalse(group.supportsProtocols(protocolType, Set("range", "foo")))
}
@Test

Loading…
Cancel
Save