Browse Source

KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)

This path enables the new group metadata manager to generate GroupMetadataKey/Value records.

Reviewers: David Jacot <djacot@confluent.io>
pull/13745/head
Jeff Kim 1 year ago committed by GitHub
parent
commit
c98c1ed41c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 81
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
  3. 217
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
  4. 14
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  5. 16
      server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

2
checkstyle/suppressions.xml

@ -324,6 +324,8 @@ @@ -324,6 +324,8 @@
files="(ConsumerGroupMember).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest).java"/>
<!-- storage -->
<suppress checks="CyclomaticComplexity"

81
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java

@ -31,7 +31,11 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -31,7 +31,11 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.List;
@ -339,7 +343,7 @@ public class RecordHelpers { @@ -339,7 +343,7 @@ public class RecordHelpers {
* Creates a ConsumerGroupCurrentMemberAssignment tombstone.
*
* @param groupId The consumer group id.
* @param memberId The consumer group member id.
* @param memberId The consumer group member id.
* @return The record.
*/
public static Record newCurrentAssignmentTombstoneRecord(
@ -357,6 +361,81 @@ public class RecordHelpers { @@ -357,6 +361,81 @@ public class RecordHelpers {
);
}
/**
* Creates a GroupMetadata record.
*
* @param group The generic group.
* @param metadataVersion The metadata version.
* @return The record.
*/
public static Record newGroupMetadataRecord(
GenericGroup group,
MetadataVersion metadataVersion
) {
List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>(group.allMembers().size());
group.allMembers().forEach(member -> {
byte[] subscription = group.protocolName().map(member::metadata).orElse(null);
if (subscription == null) {
throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
}
byte[] assignment = member.assignment();
if (assignment == null) {
throw new IllegalStateException("Attempted to write member " + member.memberId() +
" of group + " + group.groupId() + " with no assignment.");
}
members.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId(member.memberId())
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setRebalanceTimeout(member.rebalanceTimeoutMs())
.setSessionTimeout(member.sessionTimeoutMs())
.setGroupInstanceId(member.groupInstanceId().orElse(null))
.setSubscription(subscription)
.setAssignment(assignment)
);
});
return new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup(group.groupId()),
(short) 2
),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocol(group.protocolName().orElse(null))
.setProtocolType(group.protocolType().orElse(""))
.setGeneration(group.generationId())
.setLeader(group.leaderOrNull())
.setCurrentStateTimestamp(group.currentStateTimestampOrDefault())
.setMembers(members),
metadataVersion.groupMetadataValueVersion()
)
);
}
/**
* Creates a GroupMetadata tombstone.
*
* @param groupId The group id.
* @return The record.
*/
public static Record newGroupMetadataTombstoneRecord(
String groupId
) {
return new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup(groupId),
(short) 2
),
null // Tombstone
);
}
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {

217
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java

@ -17,6 +17,9 @@ @@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.consumer.ClientAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
@ -33,16 +36,30 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -33,16 +36,30 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.generic.Protocol;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
@ -60,6 +77,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme @@ -60,6 +77,7 @@ import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignme
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class RecordHelpersTest {
@ -383,4 +401,203 @@ public class RecordHelpersTest { @@ -383,4 +401,203 @@ public class RecordHelpersTest {
"member-id"
));
}
private static Stream<Arguments> metadataToExpectedGroupMetadataValue() {
return Stream.of(
Arguments.arguments(MetadataVersion.IBP_0_10_0_IV0, (short) 0),
Arguments.arguments(MetadataVersion.IBP_1_1_IV0, (short) 1),
Arguments.arguments(MetadataVersion.IBP_2_2_IV0, (short) 2),
Arguments.arguments(MetadataVersion.IBP_3_5_IV0, (short) 3)
);
}
@ParameterizedTest
@MethodSource("metadataToExpectedGroupMetadataValue")
public void testNewGroupMetadataRecord(
MetadataVersion metadataVersion,
short expectedGroupMetadataValueVersion
) {
Time time = new MockTime();
List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(new byte[]{1, 2})
);
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-2")
.setClientId("client-2")
.setClientHost("host-2")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-2")
.setSubscription(new byte[]{1, 2})
.setAssignment(new byte[]{2, 3})
);
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group-id"),
(short) 2),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocol("range")
.setProtocolType("consumer")
.setLeader("member-1")
.setGeneration(1)
.setCurrentStateTimestamp(time.milliseconds())
.setMembers(expectedMembers),
expectedGroupMetadataValueVersion));
GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);
expectedMembers.forEach(member -> {
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
member.subscription()
)),
member.assignment()
));
});
group.initNextGeneration();
Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
group,
metadataVersion
);
assertEquals(expectedRecord, groupMetadataRecord);
}
@Test
public void testNewGroupMetadataTombstoneRecord() {
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group-id"),
(short) 2),
null);
Record groupMetadataRecord = RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
assertEquals(expectedRecord, groupMetadataRecord);
}
@Test
public void testNewGroupMetadataRecordThrowsWhenNullSubscription() {
Time time = new MockTime();
List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(new byte[]{1, 2})
);
GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);
expectedMembers.forEach(member -> {
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
null
)),
member.assignment()
));
});
assertThrows(IllegalStateException.class, () ->
RecordHelpers.newGroupMetadataRecord(
group,
MetadataVersion.IBP_3_5_IV2
));
}
@Test
public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
Time time = new MockTime();
List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(null)
);
GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);
expectedMembers.forEach(member ->
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
member.subscription()
)),
member.assignment()
))
);
assertThrows(IllegalStateException.class, () ->
RecordHelpers.newGroupMetadataRecord(
group,
MetadataVersion.IBP_3_5_IV2
));
}
}

14
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -365,6 +365,20 @@ public enum MetadataVersion { @@ -365,6 +365,20 @@ public enum MetadataVersion {
}
}
public short groupMetadataValueVersion() {
if (this.isLessThan(IBP_0_10_1_IV0)) {
return 0;
} else if (this.isLessThan(IBP_2_1_IV0)) {
return 1;
} else if (this.isLessThan(IBP_2_3_IV0)) {
return 2;
} else {
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
return 3;
}
}
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{

16
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

@ -312,4 +312,20 @@ class MetadataVersionTest { @@ -312,4 +312,20 @@ class MetadataVersionTest {
}
assertEquals(expectedVersion, metadataVersion.registerBrokerRecordVersion());
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testGroupMetadataValueVersion(MetadataVersion metadataVersion) {
final short expectedVersion;
if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_3_IV0)) {
expectedVersion = 3;
} else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
expectedVersion = 2;
} else if (metadataVersion.isAtLeast(IBP_0_10_1_IV0)) {
expectedVersion = 1;
} else {
expectedVersion = 0;
}
assertEquals(expectedVersion, metadataVersion.groupMetadataValueVersion());
}
}

Loading…
Cancel
Save