Browse Source

KAFKA-14462; [22/N] Implement session and revocation timeouts (#13963)

This patch adds the session timeout and the revocation timeout to the new consumer group protocol.

Reviewers: Calvin Liu <caliu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
pull/13991/head
David Jacot 1 year ago committed by GitHub
parent
commit
aafbe34443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 244
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  2. 1
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
  3. 11
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
  4. 4
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
  5. 652
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  6. 173
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java
  7. 21
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java

244
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; @@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@ -50,6 +51,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -50,6 +51,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
@ -69,6 +71,7 @@ import java.util.Map; @@ -69,6 +71,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -98,6 +101,7 @@ public class GroupMetadataManager { @@ -98,6 +101,7 @@ public class GroupMetadataManager {
private CoordinatorTimer<Record> timer = null;
private List<PartitionAssignor> assignors = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupSessionTimeoutMs = 45000;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
private MetadataImage metadataImage = null;
@ -132,6 +136,11 @@ public class GroupMetadataManager { @@ -132,6 +136,11 @@ public class GroupMetadataManager {
return this;
}
Builder withConsumerGroupSessionTimeout(int consumerGroupSessionTimeoutMs) {
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
return this;
}
Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
return this;
@ -166,6 +175,7 @@ public class GroupMetadataManager { @@ -166,6 +175,7 @@ public class GroupMetadataManager {
assignors,
metadataImage,
consumerGroupMaxSize,
consumerGroupSessionTimeoutMs,
consumerGroupHeartbeatIntervalMs,
consumerGroupMetadataRefreshIntervalMs
);
@ -222,6 +232,11 @@ public class GroupMetadataManager { @@ -222,6 +232,11 @@ public class GroupMetadataManager {
*/
private final int consumerGroupHeartbeatIntervalMs;
/**
* The session timeout for consumer groups.
*/
private final int consumerGroupSessionTimeoutMs;
/**
* The metadata refresh interval.
*/
@ -240,6 +255,7 @@ public class GroupMetadataManager { @@ -240,6 +255,7 @@ public class GroupMetadataManager {
List<PartitionAssignor> assignors,
MetadataImage metadataImage,
int consumerGroupMaxSize,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMetadataRefreshIntervalMs
) {
@ -253,6 +269,7 @@ public class GroupMetadataManager { @@ -253,6 +269,7 @@ public class GroupMetadataManager {
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
}
@ -549,7 +566,7 @@ public class GroupMetadataManager { @@ -549,7 +566,7 @@ public class GroupMetadataManager {
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
if (memberEpoch == 0) {
log.info("[GroupId " + groupId + "] Member " + memberId + " joins the consumer group.");
log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId);
}
// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
@ -575,14 +592,14 @@ public class GroupMetadataManager { @@ -575,14 +592,14 @@ public class GroupMetadataManager {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
updatedMember.subscribedTopicNames());
log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.",
groupId, memberId, updatedMember.subscribedTopicNames());
bumpGroupEpoch = true;
}
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
updatedMember.subscribedTopicRegex());
log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
bumpGroupEpoch = true;
}
}
@ -598,8 +615,8 @@ public class GroupMetadataManager { @@ -598,8 +615,8 @@ public class GroupMetadataManager {
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
log.info("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
bumpGroupEpoch = true;
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
@ -607,7 +624,7 @@ public class GroupMetadataManager { @@ -607,7 +624,7 @@ public class GroupMetadataManager {
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
}
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
@ -632,15 +649,16 @@ public class GroupMetadataManager { @@ -632,15 +649,16 @@ public class GroupMetadataManager {
.addOrUpdateMember(memberId, updatedMember)
.build();
log.info("[GroupId " + groupId + "] Computed a new target assignment for epoch " + groupEpoch + ": "
+ assignmentResult.targetAssignment() + ".");
log.info("[GroupId {}] Computed a new target assignment for epoch {}: {}.",
groupId, groupEpoch, assignmentResult.targetAssignment());
records.addAll(assignmentResult.records());
targetAssignment = assignmentResult.targetAssignment().get(memberId);
targetAssignmentEpoch = groupEpoch;
} catch (PartitionAssignorException ex) {
String msg = "Failed to compute a new target assignment for epoch " + groupEpoch + ": " + ex + ".";
log.error("[GroupId " + groupId + "] " + msg);
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
log.error("[GroupId {}] {}.", groupId, msg);
throw new UnknownServerException(msg, ex);
}
}
@ -662,14 +680,23 @@ public class GroupMetadataManager { @@ -662,14 +680,23 @@ public class GroupMetadataManager {
assignmentUpdated = true;
records.add(newCurrentAssignmentRecord(groupId, updatedMember));
log.info("[GroupId " + groupId + "] Member " + memberId + " transitioned from " +
member.currentAssignmentSummary() + " to " + updatedMember.currentAssignmentSummary() + ".");
// TODO(dajac) Starts or restarts the timer for the revocation timeout.
log.info("[GroupId {}] Member {} transitioned from {} to {}.",
groupId, memberId, member.currentAssignmentSummary(), updatedMember.currentAssignmentSummary());
if (updatedMember.state() == ConsumerGroupMember.MemberState.REVOKING) {
scheduleConsumerGroupRevocationTimeout(
groupId,
memberId,
updatedMember.rebalanceTimeoutMs(),
updatedMember.memberEpoch()
);
} else {
cancelConsumerGroupRevocationTimeout(groupId, memberId);
}
}
}
// TODO(dajac) Starts or restarts the timer for the session timeout.
scheduleConsumerGroupSessionTimeout(groupId, memberId);
// Prepare the response.
ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
@ -700,17 +727,35 @@ public class GroupMetadataManager { @@ -700,17 +727,35 @@ public class GroupMetadataManager {
String groupId,
String memberId
) throws ApiException {
List<Record> records = new ArrayList<>();
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group.");
List<Record> records = consumerGroupFenceMember(group, member);
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(-1));
}
/**
* Fences a member from a consumer group.
*
* @param group The group.
* @param member The member.
*
* @return A list of records to be applied to the state.
*/
private List<Record> consumerGroupFenceMember(
ConsumerGroup group,
ConsumerGroupMember member
) {
List<Record> records = new ArrayList<>();
// Write tombstones for the member. The order matters here.
records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId));
records.add(newTargetAssignmentTombstoneRecord(groupId, memberId));
records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId));
records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), member.memberId()));
records.add(newTargetAssignmentTombstoneRecord(group.groupId(), member.memberId()));
records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), member.memberId()));
// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
@ -720,19 +765,118 @@ public class GroupMetadataManager { @@ -720,19 +765,118 @@ public class GroupMetadataManager {
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
}
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(-1)
);
// Cancel all the timers of the member.
cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
cancelConsumerGroupRevocationTimeout(group.groupId(), member.memberId());
return records;
}
/**
* Schedules (or reschedules) the session timeout for the member.
*
* @param groupId The group id.
* @param memberId The member id.
*/
private void scheduleConsumerGroupSessionTimeout(
String groupId,
String memberId
) {
String key = consumerGroupSessionTimeoutKey(groupId, memberId);
timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
try {
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} fenced from the group because its session expired.",
groupId, memberId);
return consumerGroupFenceMember(group, member);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {} because the group does not exist.",
groupId, memberId);
} catch (UnknownMemberIdException ex) {
log.debug("[GroupId {}] Could not fence {} because the member does not exist.",
groupId, memberId);
}
return Collections.emptyList();
});
}
/**
* Cancels the session timeout of the member.
*
* @param groupId The group id.
* @param memberId The member id.
*/
private void cancelConsumerGroupSessionTimeout(
String groupId,
String memberId
) {
timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
}
/**
* Schedules a revocation timeout for the member.
*
* @param groupId The group id.
* @param memberId The member id.
* @param revocationTimeoutMs The revocation timeout.
* @param expectedMemberEpoch The expected member epoch.
*/
private void scheduleConsumerGroupRevocationTimeout(
String groupId,
String memberId,
long revocationTimeoutMs,
int expectedMemberEpoch
) {
String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
try {
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
if (member.state() != ConsumerGroupMember.MemberState.REVOKING ||
member.memberEpoch() != expectedMemberEpoch) {
log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " +
"state does not match the expected state.", groupId, memberId);
return Collections.emptyList();
}
log.info("[GroupId {}] Member {} fenced from the group because " +
"it failed to revoke partitions within {}ms.", groupId, memberId, revocationTimeoutMs);
return consumerGroupFenceMember(group, member);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {}} because the group does not exist.",
groupId, memberId);
} catch (UnknownMemberIdException ex) {
log.debug("[GroupId {}] Could not fence {} because the member does not exist.",
groupId, memberId);
}
return Collections.emptyList();
});
}
/**
* Cancels the revocation timeout of the member.
*
* @param groupId The group id.
* @param memberId The member id.
*/
private void cancelConsumerGroupRevocationTimeout(
String groupId,
String memberId
) {
timer.cancel(consumerGroupRevocationTimeoutKey(groupId, memberId));
}
/**
@ -1059,4 +1203,44 @@ public class GroupMetadataManager { @@ -1059,4 +1203,44 @@ public class GroupMetadataManager {
}
});
}
/**
* The coordinator has been loaded. Session timeouts are registered
* for all members.
*/
public void onLoaded() {
groups.forEach((groupId, group) -> {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
log.info("Loaded consumer group {} with {} members.", groupId, consumerGroup.members().size());
consumerGroup.members().forEach((memberId, member) -> {
log.debug("Loaded member {} in consumer group {}.", memberId, groupId);
scheduleConsumerGroupSessionTimeout(groupId, memberId);
if (member.state() == ConsumerGroupMember.MemberState.REVOKING) {
scheduleConsumerGroupRevocationTimeout(
groupId,
memberId,
member.rebalanceTimeoutMs(),
member.memberEpoch()
);
}
});
break;
case GENERIC:
GenericGroup genericGroup = (GenericGroup) group;
log.info("Loaded generic group {} with {} members.", groupId, genericGroup.allMembers().size());
break;
}
});
}
public static String consumerGroupSessionTimeoutKey(String groupId, String memberId) {
return "session-timeout-" + groupId + "-" + memberId;
}
public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) {
return "revocation-timeout-" + groupId + "-" + memberId;
}
}

1
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java

@ -167,6 +167,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> { @@ -167,6 +167,7 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
@Override
public void onLoaded(MetadataImage newImage) {
groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage));
groupMetadataManager.onLoaded();
}
/**

11
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java

@ -51,6 +51,17 @@ public class CoordinatorResult<T, U> { @@ -51,6 +51,17 @@ public class CoordinatorResult<T, U> {
this.response = response;
}
/**
* Constructs a Result with records and a response.
*
* @param records A non-null list of records.
*/
public CoordinatorResult(
List<U> records
) {
this(records, null);
}
/**
* @return The list of records.
*/

4
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java

@ -280,8 +280,8 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos @@ -280,8 +280,8 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
return new CoordinatorResult<>(operation.generateRecords(), null);
});
// If the write event fails, it is rescheduled with a small backoff except if the
// error is fatal.
// If the write event fails, it is rescheduled with a small backoff except if retry
// is disabled or if the error is fatal.
event.future.exceptionally(ex -> {
if (ex instanceof RejectedExecutionException) {
log.debug("The write event {} for the timer {} was not executed because it was " +

652
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

@ -39,7 +39,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -39,7 +39,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@ -62,7 +61,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -62,7 +61,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -83,16 +81,18 @@ import java.util.List; @@ -83,16 +81,18 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRevocationTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -100,15 +100,6 @@ import static org.mockito.Mockito.mock; @@ -100,15 +100,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GroupMetadataManagerTest {
// Timer is not used yet so an empty mock is fine for now.
static class MockCoordinatorTimer implements CoordinatorTimer<Record> {
@Override
public void schedule(String key, long delay, TimeUnit unit, boolean retry, TimeoutOperation<Record> operation) {}
@Override
public void cancel(String key) {}
}
static class MockPartitionAssignor implements PartitionAssignor {
private final String name;
private GroupAssignment prepareGroupAssignment = null;
@ -239,7 +230,8 @@ public class GroupMetadataManagerTest { @@ -239,7 +230,8 @@ public class GroupMetadataManagerTest {
static class GroupMetadataManagerTestContext {
static class Builder {
final private Time time = new MockTime();
final private MockTime time = new MockTime();
final private MockCoordinatorTimer<Record> timer = new MockCoordinatorTimer<>(time);
final private LogContext logContext = new LogContext();
final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private MetadataImage metadataImage;
@ -279,14 +271,16 @@ public class GroupMetadataManagerTest { @@ -279,14 +271,16 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
time,
timer,
snapshotRegistry,
new GroupMetadataManager.Builder()
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
.withTime(time)
.withTimer(new MockCoordinatorTimer())
.withTimer(timer)
.withMetadataImage(metadataImage)
.withConsumerGroupHeartbeatInterval(5000)
.withConsumerGroupSessionTimeout(45000)
.withConsumerGroupMaxSize(consumerGroupMaxSize)
.withAssignors(assignors)
.withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs)
@ -303,7 +297,8 @@ public class GroupMetadataManagerTest { @@ -303,7 +297,8 @@ public class GroupMetadataManagerTest {
}
}
final Time time;
final MockTime time;
final MockCoordinatorTimer<Record> timer;
final SnapshotRegistry snapshotRegistry;
final GroupMetadataManager groupMetadataManager;
@ -311,11 +306,13 @@ public class GroupMetadataManagerTest { @@ -311,11 +306,13 @@ public class GroupMetadataManagerTest {
long lastWrittenOffset = 0L;
public GroupMetadataManagerTestContext(
Time time,
MockTime time,
MockCoordinatorTimer<Record> timer,
SnapshotRegistry snapshotRegistry,
GroupMetadataManager groupMetadataManager
) {
this.time = time;
this.timer = timer;
this.snapshotRegistry = snapshotRegistry;
this.groupMetadataManager = groupMetadataManager;
}
@ -379,6 +376,55 @@ public class GroupMetadataManagerTest { @@ -379,6 +376,55 @@ public class GroupMetadataManagerTest {
return result;
}
public List<MockCoordinatorTimer.ExpiredTimeout<Record>> sleep(long ms) {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Record>> timeouts = timer.poll();
timeouts.forEach(timeout -> timeout.records.forEach(this::replay));
return timeouts;
}
public MockCoordinatorTimer.ScheduledTimeout<Record> assertSessionTimeout(
String groupId,
String memberId,
long delayMs
) {
MockCoordinatorTimer.ScheduledTimeout<Record> timeout =
timer.timeout(consumerGroupSessionTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
return timeout;
}
public void assertNoSessionTimeout(
String groupId,
String memberId
) {
MockCoordinatorTimer.ScheduledTimeout<Record> timeout =
timer.timeout(consumerGroupSessionTimeoutKey(groupId, memberId));
assertNull(timeout);
}
public MockCoordinatorTimer.ScheduledTimeout<Record> assertRevocationTimeout(
String groupId,
String memberId,
long delayMs
) {
MockCoordinatorTimer.ScheduledTimeout<Record> timeout =
timer.timeout(consumerGroupRevocationTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
return timeout;
}
public void assertNoRevocationTimeout(
String groupId,
String memberId
) {
MockCoordinatorTimer.ScheduledTimeout<Record> timeout =
timer.timeout(consumerGroupRevocationTimeoutKey(groupId, memberId));
assertNull(timeout);
}
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@ -2402,6 +2448,580 @@ public class GroupMetadataManagerTest { @@ -2402,6 +2448,580 @@ public class GroupMetadataManagerTest {
assertEquals(image, context.groupMetadataManager.image());
}
@Test
public void testSessionTimeoutLifecycle() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
// Session timer is scheduled on first heartbeat.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertEquals(1, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time.
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Session timer is rescheduled on second heartbeat.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch()));
assertEquals(1, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time.
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Session timer is cancelled on leave.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(-1));
assertEquals(-1, result.response().memberEpoch());
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId);
context.assertNoRevocationTimeout(groupId, memberId);
}
@Test
public void testSessionTimeoutExpiration() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)))
));
// Session timer is scheduled on first heartbeat.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertEquals(1, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time past the session timeout.
List<MockCoordinatorTimer.ExpiredTimeout<Record>> timeouts = context.sleep(45000 + 1);
// Verify the expired timeout.
assertEquals(
Collections.singletonList(new MockCoordinatorTimer.ExpiredTimeout<Record>(
consumerGroupSessionTimeoutKey(groupId, memberId),
Arrays.asList(
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId),
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()),
RecordHelpers.newGroupEpochRecord(groupId, 2)
)
)),
timeouts
);
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId);
context.assertNoRevocationTimeout(groupId, memberId);
}
@Test
public void testRevocationTimeoutLifecycle() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
String memberId3 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
}
}
));
// Member 1 joins the group.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(180000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1, 2))))),
result.response()
);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Prepare next assignment.
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 2)
)));
}
}
));
// Member 2 joins the group.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setPendingTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(2))))),
result.response()
);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Member 1 heartbeats and transitions to revoking. The revocation timeout
// is scheduled.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
.setRebalanceTimeoutMs(12000)
.setSubscribedTopicNames(Collections.singletonList("foo")));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1))))),
result.response()
);
// Verify that there is a revocation timeout.
context.assertRevocationTimeout(groupId, memberId1, 12000);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Prepare next assignment.
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
put(memberId2, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 2)
)));
put(memberId3, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
)));
}
}
));
// Member 3 joins the group.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId3)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId3)
.setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setPendingTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(1))))),
result.response()
);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Member 1 heartbeats and re-transitions to revoking. The revocation timeout
// is re-scheduled.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(Collections.singletonList("foo")));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0))))),
result.response()
);
// Verify that there is a revocation timeout. Keep a reference
// to the timeout for later.
MockCoordinatorTimer.ScheduledTimeout<Record> scheduledTimeout =
context.assertRevocationTimeout(groupId, memberId1, 90000);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Member 1 acks the revocation. The revocation timeout is cancelled.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1)
.setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Collections.singletonList(0)))));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0))))),
result.response()
);
// Verify that there is not revocation timeout.
context.assertNoRevocationTimeout(groupId, memberId1);
// Execute the scheduled revocation timeout captured earlier to simulate a
// stale timeout. This should be a no-op.
assertEquals(Collections.emptyList(), scheduledTimeout.operation.generateRecords());
}
@Test
public void testRevocationTimeoutExpiration() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 3)
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)));
}
}
));
// Member 1 joins the group.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(10000) // Use timeout smaller than session timeout.
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1, 2))))),
result.response()
);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Prepare next assignment.
assignor.prepareGroupAssignment(new GroupAssignment(
new HashMap<String, MemberAssignment>() {
{
put(memberId1, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)
)));
put(memberId2, new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 2)
)));
}
}
));
// Member 2 joins the group.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(10000)
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setTopicPartitions(Collections.emptyList()));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setPendingTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(2))))),
result.response()
);
assertEquals(
Collections.emptyList(),
context.sleep(result.response().heartbeatIntervalMs())
);
// Member 1 heartbeats and transitions to revoking. The revocation timeout
// is scheduled.
result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(1));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1))))),
result.response()
);
// Advance time past the revocation timeout.
List<MockCoordinatorTimer.ExpiredTimeout<Record>> timeouts = context.sleep(10000 + 1);
// Verify the expired timeout.
assertEquals(
Collections.singletonList(new MockCoordinatorTimer.ExpiredTimeout<Record>(
consumerGroupRevocationTimeoutKey(groupId, memberId1),
Arrays.asList(
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1),
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1),
RecordHelpers.newGroupEpochRecord(groupId, 3)
)
)),
timeouts
);
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId1);
context.assertNoRevocationTimeout(groupId, memberId1);
}
@Test
public void testOnLoaded() {
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(new MockPartitionAssignor("range")))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
.withConsumerGroup(new ConsumerGroupBuilder("foo", 10)
.withMember(new ConsumerGroupMember.Builder("foo-1")
.setMemberEpoch(9)
.setPreviousMemberEpoch(9)
.setTargetMemberEpoch(10)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.setPartitionsPendingRevocation(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build())
.withMember(new ConsumerGroupMember.Builder("foo-2")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setTargetMemberEpoch(10)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo"))
.setServerAssignorName("range")
.setPartitionsPendingAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build())
.withAssignment("foo-1", mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignment("foo-2", mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.withAssignmentEpoch(10))
.build();
// Let's assume that all the records have been replayed and now
// onLoaded is called to signal it.
context.groupMetadataManager.onLoaded();
// All members should have a session timeout in place.
assertNotNull(context.timer.timeout(consumerGroupSessionTimeoutKey("foo", "foo-1")));
assertNotNull(context.timer.timeout(consumerGroupSessionTimeoutKey("foo", "foo-2")));
// foo-1 should also have a revocation timeout in place.
assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", "foo-1")));
}
private <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual

173
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java

@ -0,0 +1,173 @@ @@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
/**
* A simple mock for the {@link CoordinatorTimer}. The mock does not automatically
* expire timeouts. They are only expired when {@link MockCoordinatorTimer#poll()}
* is called.
*/
public class MockCoordinatorTimer<T> implements CoordinatorTimer<T> {
/**
* Represents a scheduled timeout.
*/
public static class ScheduledTimeout<T> {
public final String key;
public final long deadlineMs;
public final TimeoutOperation<T> operation;
ScheduledTimeout(
String key,
long deadlineMs,
TimeoutOperation<T> operation
) {
this.key = key;
this.deadlineMs = deadlineMs;
this.operation = operation;
}
}
/**
* Represents an expired timeout.
*/
public static class ExpiredTimeout<T> {
public final String key;
public final List<T> records;
ExpiredTimeout(
String key,
List<T> records
) {
this.key = key;
this.records = records;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExpiredTimeout<?> that = (ExpiredTimeout<?>) o;
if (!Objects.equals(key, that.key)) return false;
return Objects.equals(records, that.records);
}
@Override
public int hashCode() {
int result = key != null ? key.hashCode() : 0;
result = 31 * result + (records != null ? records.hashCode() : 0);
return result;
}
}
private final Time time;
private final Map<String, ScheduledTimeout<T>> timeoutMap = new HashMap<>();
private final PriorityQueue<ScheduledTimeout<T>> timeoutQueue = new PriorityQueue<>(
Comparator.comparingLong(entry -> entry.deadlineMs)
);
public MockCoordinatorTimer(Time time) {
this.time = time;
}
/**
* Schedules a timeout.
*/
@Override
public void schedule(
String key,
long delay,
TimeUnit unit,
boolean retry,
TimeoutOperation<T> operation
) {
cancel(key);
long deadlineMs = time.milliseconds() + unit.toMillis(delay);
ScheduledTimeout<T> timeout = new ScheduledTimeout<>(key, deadlineMs, operation);
timeoutQueue.add(timeout);
timeoutMap.put(key, timeout);
}
/**
* Cancels a timeout.
*/
@Override
public void cancel(String key) {
ScheduledTimeout<T> timeout = timeoutMap.remove(key);
if (timeout != null) {
timeoutQueue.remove(timeout);
}
}
/**
* @return True if a timeout with the key exists; false otherwise.
*/
public boolean contains(String key) {
return timeoutMap.containsKey(key);
}
/**
* @return The scheduled timeout for the key; null otherwise.
*/
public ScheduledTimeout<T> timeout(String key) {
return timeoutMap.get(key);
}
/**
* @return The number of scheduled timeouts.
*/
public int size() {
return timeoutMap.size();
}
/**
* @return A list of expired timeouts based on the current time.
*/
public List<ExpiredTimeout<T>> poll() {
List<ExpiredTimeout<T>> results = new ArrayList<>();
ScheduledTimeout<T> timeout = timeoutQueue.peek();
while (timeout != null && timeout.deadlineMs <= time.milliseconds()) {
timeoutQueue.poll();
timeoutMap.remove(timeout.key, timeout);
results.add(new ExpiredTimeout<>(
timeout.key,
timeout.operation.generateRecords()
));
timeout = timeoutQueue.peek();
}
return results;
}
}

21
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinatorTest.java

@ -33,6 +33,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -33,6 +33,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
@ -41,6 +42,8 @@ import java.util.Collections; @@ -41,6 +42,8 @@ import java.util.Collections;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -305,4 +308,22 @@ public class ReplicatedGroupCoordinatorTest { @@ -305,4 +308,22 @@ public class ReplicatedGroupCoordinatorTest {
new ApiMessageAndVersion(value, (short) 0)
)));
}
@Test
public void testOnLoaded() {
MetadataImage image = MetadataImage.EMPTY;
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
ReplicatedGroupCoordinator coordinator = new ReplicatedGroupCoordinator(
groupMetadataManager
);
coordinator.onLoaded(image);
verify(groupMetadataManager, times(1)).onNewMetadataImage(
eq(image),
any()
);
verify(groupMetadataManager, times(1)).onLoaded();
}
}

Loading…
Cancel
Save