Browse Source

KAFKA-14500; [2/N] Rewrite GroupMetadata in Java (#13663)

This patch introduces `GenericGroup` which rewrite the `GroupMetadata` in Java. The `GenericGroup` is basically a group using the current rebalance protocol in the new group coordinator.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
pull/13713/head
Jeff Kim 1 year ago committed by GitHub
parent
commit
cc011f77aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      checkstyle/import-control.xml
  2. 1011
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
  3. 136
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java
  4. 966
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java

1
checkstyle/import-control.xml

@ -221,6 +221,7 @@ @@ -221,6 +221,7 @@
<subpackage name="coordinator">
<subpackage name="group">
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />

1011
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java

File diff suppressed because it is too large Load Diff

136
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java

@ -0,0 +1,136 @@ @@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.generic;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* Represents all states that a generic group can be in, as well as the states that a group must
* be in to transition to a particular state.
*/
public enum GenericGroupState {
/**
* Group has no more members, but lingers until all offsets have expired. This state
* also represents groups which use Kafka only for offset commits and have no members.
*
* action: respond normally to join group from new members
* respond to sync group with UNKNOWN_MEMBER_ID
* respond to heartbeat with UNKNOWN_MEMBER_ID
* respond to leave group with UNKNOWN_MEMBER_ID
* respond to offset commit with UNKNOWN_MEMBER_ID
* allow offset fetch requests
* transition: last offsets removed in periodic expiration task => DEAD
* join group from a new member => PREPARING_REBALANCE
* group is removed by partition emigration => DEAD
* group is removed by expiration => DEAD
*/
EMPTY("Empty"),
/**
* Group is preparing to rebalance.
*
* action: respond to heartbeats with REBALANCE_IN_PROGRESS
* respond to sync group with REBALANCE_IN_PROGRESS
* remove member on leave group request
* park join group requests from new or existing members until all expected members have joined
* allow offset commits from previous generation
* allow offset fetch requests
* transition: some members have joined by the timeout => COMPLETING_REBALANCE
* all members have left the group => EMPTY
* group is removed by partition emigration => DEAD
*/
PREPARING_REBALANCE("PreparingRebalance"),
/**
* Group is awaiting state assignment from the leader.
*
* action: respond to heartbeats with REBALANCE_IN_PROGRESS
* respond to offset commits with REBALANCE_IN_PROGRESS
* park sync group requests from followers until transition to STABLE
* allow offset fetch requests
* transition: sync group with state assignment received from leader => STABLE
* join group from new member or existing member with updated metadata => PREPARING_REBALANCE
* leave group from existing member => PREPARING_REBALANCE
* member failure detected => PREPARING_REBALANCE
* group is removed by partition emigration => DEAD
*/
COMPLETING_REBALANCE("CompletingRebalance"),
/**
* Group is stable.
*
* action: respond to member heartbeats normally
* respond to sync group from any member with current assignment
* respond to join group from followers with matching metadata with current group metadata
* allow offset commits from member of current generation
* allow offset fetch requests
* transition: member failure detected via heartbeat => PREPARING_REBALANCE
* leave group from existing member => PREPARING_REBALANCE
* leader join-group received => PREPARING_REBALANCE
* follower join-group with new metadata => PREPARING_REBALANCE
* group is removed by partition emigration => DEAD
*/
STABLE("Stable"),
/**
* Group has no more members and its metadata is being removed.
*
* action: respond to join group with UNKNOWN_MEMBER_ID
* respond to sync group with UNKNOWN_MEMBER_ID
* respond to heartbeat with UNKNOWN_MEMBER_ID
* respond to leave group with UNKNOWN_MEMBER_ID
* respond to offset commit with UNKNOWN_MEMBER_ID
* allow offset fetch requests
* transition: DEAD is a final state before group metadata is cleaned up, so there are no transitions
*/
DEAD("Dead");
private final String name;
private Set<GenericGroupState> validPreviousStates;
static {
EMPTY.addValidPreviousStates(PREPARING_REBALANCE);
PREPARING_REBALANCE.addValidPreviousStates(STABLE, COMPLETING_REBALANCE, EMPTY);
COMPLETING_REBALANCE.addValidPreviousStates(PREPARING_REBALANCE);
STABLE.addValidPreviousStates(COMPLETING_REBALANCE);
DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE, EMPTY, DEAD);
}
GenericGroupState(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
private void addValidPreviousStates(GenericGroupState... validPreviousStates) {
this.validPreviousStates = new HashSet<>(Arrays.asList(validPreviousStates));
}
/**
* @return valid previous states a group must be in to transition to this state.
*/
public Set<GenericGroupState> validPreviousStates() {
return this.validPreviousStates;
}
}

966
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java

@ -0,0 +1,966 @@ @@ -0,0 +1,966 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GenericGroupTest {
private final String protocolType = "consumer";
private final String groupInstanceId = "groupInstanceId";
private final String memberId = "memberId";
private final String clientId = "clientId";
private final String clientHost = "clientHost";
private final int rebalanceTimeoutMs = 60000;
private final int sessionTimeoutMs = 10000;
private GenericGroup group = null;
@BeforeEach
public void initialize() {
group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM);
}
@Test
public void testCanRebalanceWhenStable() {
assertTrue(group.canRebalance());
}
@Test
public void testCanRebalanceWhenCompletingRebalance() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
assertTrue(group.canRebalance());
}
@Test
public void testCannotRebalanceWhenPreparingRebalance() {
group.transitionTo(PREPARING_REBALANCE);
assertFalse(group.canRebalance());
}
@Test
public void testCannotRebalanceWhenDead() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(EMPTY);
group.transitionTo(DEAD);
assertFalse(group.canRebalance());
}
@Test
public void testStableToPreparingRebalanceTransition() {
group.transitionTo(PREPARING_REBALANCE);
assertState(group, PREPARING_REBALANCE);
}
@Test
public void testStableToDeadTransition() {
group.transitionTo(DEAD);
assertState(group, DEAD);
}
@Test
public void testAwaitingRebalanceToPreparingRebalanceTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
group.transitionTo(PREPARING_REBALANCE);
assertState(group, PREPARING_REBALANCE);
}
@Test
public void testPreparingRebalanceToDeadTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(DEAD);
assertState(group, DEAD);
}
@Test
public void testPreparingRebalanceToEmptyTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(EMPTY);
assertState(group, EMPTY);
}
@Test
public void testEmptyToDeadTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(EMPTY);
group.transitionTo(DEAD);
assertState(group, DEAD);
}
@Test
public void testAwaitingRebalanceToStableTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
group.transitionTo(STABLE);
assertState(group, STABLE);
}
@Test
public void testEmptyToStableIllegalTransition() {
assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE));
}
@Test
public void testStableToStableIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
group.transitionTo(STABLE);
assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE));
}
@Test
public void testEmptyToAwaitingRebalanceIllegalTransition() {
assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE));
}
@Test
public void testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
assertThrows(IllegalStateException.class, () -> group.transitionTo(PREPARING_REBALANCE));
}
@Test
public void testPreparingRebalanceToStableIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE));
}
@Test
public void testAwaitingRebalanceToAwaitingRebalanceIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE));
}
@Test
public void testDeadToDeadIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(DEAD);
group.transitionTo(DEAD);
assertState(group, DEAD);
}
@Test
public void testDeadToStableIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(DEAD);
assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE));
}
@Test
public void testDeadToPreparingRebalanceIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(DEAD);
assertThrows(IllegalStateException.class, () -> group.transitionTo(PREPARING_REBALANCE));
}
@Test
public void testDeadToAwaitingRebalanceIllegalTransition() {
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(DEAD);
assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE));
}
@Test
public void testSelectProtocol() {
List<Protocol> member1Protocols = Arrays.asList(
new Protocol("range", new byte[0]),
new Protocol("roundrobin", new byte[0])
);
GenericGroupMember member1 = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member1Protocols
);
group.add(member1);
List<Protocol> member2Protocols = Arrays.asList(
new Protocol("roundrobin", new byte[0]),
new Protocol("range", new byte[0])
);
GenericGroupMember member2 = new GenericGroupMember(
"member2",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member2Protocols
);
group.add(member2);
// now could be either range or robin since there is no majority preference
assertTrue(group.selectProtocol().equals("range") ||
group.selectProtocol().equals("roundrobin"));
GenericGroupMember member3 = new GenericGroupMember(
"member3",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member2Protocols
);
group.add(member3);
// now we should prefer 'roundrobin'
assertEquals("roundrobin", group.selectProtocol());
}
@Test
public void testSelectProtocolRaisesIfNoMembers() {
assertThrows(IllegalStateException.class, () -> group.selectProtocol());
}
@Test
public void testSelectProtocolChoosesCompatibleProtocol() {
List<Protocol> member1Protocols = Arrays.asList(
new Protocol("range", new byte[0]),
new Protocol("roundrobin", new byte[0])
);
GenericGroupMember member1 = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member1Protocols
);
group.add(member1);
List<Protocol> member2Protocols = Arrays.asList(
new Protocol("roundrobin", new byte[0]),
new Protocol("blah", new byte[0])
);
GenericGroupMember member2 = new GenericGroupMember(
"member2",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member2Protocols
);
group.add(member2);
assertEquals("roundrobin", group.selectProtocol());
}
@Test
public void testSupportsProtocols() {
List<Protocol> member1Protocols = Arrays.asList(
new Protocol("range", new byte[0]),
new Protocol("roundrobin", new byte[0])
);
GenericGroupMember member1 = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
member1Protocols
);
// by default, the group supports everything
assertTrue(group.supportsProtocols(protocolType, mkSet("range", "roundrobin")));
group.add(member1);
group.transitionTo(PREPARING_REBALANCE);
assertTrue(group.supportsProtocols(protocolType, mkSet("roundrobin", "foo")));
assertTrue(group.supportsProtocols(protocolType, mkSet("range", "bar")));
assertFalse(group.supportsProtocols(protocolType, mkSet("foo", "bar")));
}
@Test
public void testSubscribedTopics() {
// not able to compute it for a newly created group
assertEquals(Optional.empty(), group.subscribedTopics());
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"range",
ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("foo")
)
).array()
)
)
);
group.transitionTo(PREPARING_REBALANCE);
group.add(member);
group.initNextGeneration();
Set<String> expectedTopics = new HashSet<>(Collections.singleton("foo"));
assertEquals(expectedTopics, group.subscribedTopics().get());
group.transitionTo(PREPARING_REBALANCE);
group.remove(memberId);
group.initNextGeneration();
assertEquals(Optional.of(Collections.emptySet()), group.subscribedTopics());
GenericGroupMember memberWithFaultyProtocol = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"range",
new byte[0]
)
)
);
group.transitionTo(PREPARING_REBALANCE);
group.add(memberWithFaultyProtocol);
group.initNextGeneration();
assertEquals(Optional.empty(), group.subscribedTopics());
}
@Test
public void testSubscribedTopicsNonConsumerGroup() {
// not able to compute it for a newly created group
assertEquals(Optional.empty(), group.subscribedTopics());
GenericGroupMember memberWithNonConsumerProtocol = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
"My Protocol",
Collections.singletonList(
new Protocol(
"range",
new byte[0]
)
)
);
group.transitionTo(PREPARING_REBALANCE);
group.add(memberWithNonConsumerProtocol);
group.initNextGeneration();
assertEquals(Optional.empty(), group.subscribedTopics());
}
@Test
public void testInitNextGeneration() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.transitionTo(PREPARING_REBALANCE);
group.add(member, new CompletableFuture<>());
assertEquals(0, group.generationId());
assertNull(group.protocolName().orElse(null));
group.initNextGeneration();
assertEquals(1, group.generationId());
assertEquals("roundrobin", group.protocolName().orElse(null));
}
@Test
public void testInitNextGenerationEmptyGroup() {
assertEquals(EMPTY, group.currentState());
assertEquals(0, group.generationId());
assertNull(group.protocolName().orElse(null));
group.transitionTo(PREPARING_REBALANCE);
group.initNextGeneration();
assertEquals(1, group.generationId());
assertNull(group.protocolName().orElse(null));
}
@Test
public void testUpdateMember() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
List<Protocol> newProtocols = Arrays.asList(
new Protocol(
"range",
new byte[0]
),
new Protocol(
"roundrobin",
new byte[0]
)
);
int newRebalanceTimeoutMs = 120000;
int newSessionTimeoutMs = 20000;
group.updateMember(member, newProtocols, newRebalanceTimeoutMs, newSessionTimeoutMs, null);
assertEquals(group.rebalanceTimeoutMs(), newRebalanceTimeoutMs);
assertEquals(member.sessionTimeoutMs(), newSessionTimeoutMs);
assertEquals(newProtocols, member.supportedProtocols());
}
@Test
public void testReplaceGroupInstanceWithNonExistingMember() {
String newMemberId = "newMemberId";
assertThrows(IllegalArgumentException.class, () ->
group.replaceStaticMember(groupInstanceId, memberId, newMemberId));
}
@Test
public void testReplaceGroupInstance() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.of(groupInstanceId),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
CompletableFuture<JoinGroupResponseData> joinGroupFuture = new CompletableFuture<>();
group.add(member, joinGroupFuture);
CompletableFuture<SyncGroupResponseData> syncGroupFuture = new CompletableFuture<>();
member.setAwaitingSyncFuture(syncGroupFuture);
assertTrue(group.isLeader(memberId));
assertEquals(memberId, group.staticMemberId(groupInstanceId));
String newMemberId = "newMemberId";
group.replaceStaticMember(groupInstanceId, memberId, newMemberId);
assertTrue(group.isLeader(newMemberId));
assertEquals(newMemberId, group.staticMemberId(groupInstanceId));
assertEquals(Errors.FENCED_INSTANCE_ID.code(), joinGroupFuture.get().errorCode());
assertEquals(Errors.FENCED_INSTANCE_ID.code(), syncGroupFuture.get().errorCode());
assertFalse(member.isAwaitingJoin());
assertFalse(member.isAwaitingSync());
}
@Test
public void testCompleteJoinFuture() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
CompletableFuture<JoinGroupResponseData> joinGroupFuture = new CompletableFuture<>();
group.add(member, joinGroupFuture);
assertTrue(group.hasAllMembersJoined());
assertTrue(
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()))
);
assertEquals(Errors.NONE.code(), joinGroupFuture.get().errorCode());
assertEquals(memberId, joinGroupFuture.get().memberId());
assertFalse(member.isAwaitingJoin());
assertEquals(0, group.numAwaitingJoinResponse());
}
@Test
public void testNotCompleteJoinFuture() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertFalse(member.isAwaitingJoin());
assertFalse(
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()))
);
assertFalse(member.isAwaitingJoin());
}
@Test
public void testCompleteSyncFuture() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
CompletableFuture<SyncGroupResponseData> syncGroupFuture = new CompletableFuture<>();
member.setAwaitingSyncFuture(syncGroupFuture);
assertTrue(group.completeSyncFuture(member, new SyncGroupResponseData()
.setErrorCode(Errors.NONE.code())));
assertEquals(0, group.numAwaitingJoinResponse());
assertFalse(member.isAwaitingSync());
assertEquals(Errors.NONE.code(), syncGroupFuture.get().errorCode());
}
@Test
public void testNotCompleteSyncFuture() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertFalse(member.isAwaitingSync());
assertFalse(group.completeSyncFuture(member, new SyncGroupResponseData()
.setErrorCode(Errors.NONE.code())));
assertFalse(member.isAwaitingSync());
}
@Test
public void testCannotAddPendingMemberIfStable() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertThrows(IllegalStateException.class, () -> group.addPendingMember(memberId));
}
@Test
public void testRemovalFromPendingAfterMemberIsStable() {
group.addPendingMember(memberId);
assertFalse(group.hasMemberId(memberId));
assertTrue(group.isPendingMember(memberId));
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertTrue(group.hasMemberId(memberId));
assertFalse(group.isPendingMember(memberId));
}
@Test
public void testRemovalFromPendingWhenMemberIsRemoved() {
group.addPendingMember(memberId);
assertFalse(group.hasMemberId(memberId));
assertTrue(group.isPendingMember(memberId));
group.remove(memberId);
assertFalse(group.hasMemberId(memberId));
assertFalse(group.isPendingMember(memberId));
}
@Test
public void testCannotAddStaticMemberIfAlreadyPresent() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.of(groupInstanceId),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertTrue(group.hasMemberId(memberId));
assertTrue(group.hasStaticMember(groupInstanceId));
// We aren ot permitted to add the member again if it is already present
assertThrows(IllegalStateException.class, () -> group.add(member));
}
@Test
public void testCannotAddPendingSyncOfUnknownMember() {
assertThrows(IllegalStateException.class,
() -> group.addPendingSyncMember(memberId));
}
@Test
public void testCannotRemovePendingSyncOfUnknownMember() {
assertThrows(IllegalStateException.class,
() -> group.removePendingSyncMember(memberId));
}
@Test
public void testCanAddAndRemovePendingSyncMember() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertTrue(group.addPendingSyncMember(memberId));
assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers());
group.removePendingSyncMember(memberId);
assertEquals(Collections.emptySet(), group.allPendingSyncMembers());
}
@Test
public void testRemovalFromPendingSyncWhenMemberIsRemoved() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.of(groupInstanceId),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
assertTrue(group.addPendingSyncMember(memberId));
assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers());
group.remove(memberId);
assertEquals(Collections.emptySet(), group.allPendingSyncMembers());
}
@Test
public void testNewGenerationClearsPendingSyncMembers() {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(member);
group.transitionTo(PREPARING_REBALANCE);
assertTrue(group.addPendingSyncMember(memberId));
assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers());
group.initNextGeneration();
assertEquals(Collections.emptySet(), group.allPendingSyncMembers());
}
@Test
public void testElectNewJoinedLeader() {
GenericGroupMember leader = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(leader);
assertTrue(group.isLeader(memberId));
assertFalse(leader.isAwaitingJoin());
GenericGroupMember newLeader = new GenericGroupMember(
"new-leader",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(newLeader, new CompletableFuture<>());
GenericGroupMember newMember = new GenericGroupMember(
"new-member",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(newMember);
assertTrue(group.maybeElectNewJoinedLeader());
assertTrue(group.isLeader("new-leader"));
}
@Test
public void testMaybeElectNewJoinedLeaderChooseExisting() {
GenericGroupMember leader = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(leader, new CompletableFuture<>());
assertTrue(group.isLeader(memberId));
assertTrue(leader.isAwaitingJoin());
GenericGroupMember newMember = new GenericGroupMember(
"new-member",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(newMember);
assertTrue(group.maybeElectNewJoinedLeader());
assertTrue(group.isLeader(memberId));
}
private void assertState(GenericGroup group, GenericGroupState targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);
otherStates.add(PREPARING_REBALANCE);
otherStates.add(COMPLETING_REBALANCE);
otherStates.add(DEAD);
otherStates.remove(targetState);
otherStates.forEach(otherState -> assertFalse(group.isInState(otherState)));
assertTrue(group.isInState(targetState));
}
}
Loading…
Cancel
Save